Mercurial > repos > bimib > cobraxy
changeset 419:ed2c1f9e20ba draft
Uploaded
author | francesco_lapi |
---|---|
date | Tue, 09 Sep 2025 09:08:17 +0000 |
parents | 919b5b71a61c |
children | 0877682fff48 |
files | COBRAxy/custom_data_generator_beta.py COBRAxy/flux_simulation_beta.py COBRAxy/flux_simulation_beta.xml COBRAxy/utils/general_utils.py COBRAxy/utils/model_utils.py |
diffstat | 5 files changed, 437 insertions(+), 366 deletions(-) [+] |
line wrap: on
line diff
--- a/COBRAxy/custom_data_generator_beta.py Tue Sep 09 07:36:30 2025 +0000 +++ b/COBRAxy/custom_data_generator_beta.py Tue Sep 09 09:08:17 2025 +0000 @@ -174,7 +174,7 @@ if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": - model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) + model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) # generate data rules = modelUtils.generate_rules(model, asParsed = False)
--- a/COBRAxy/flux_simulation_beta.py Tue Sep 09 07:36:30 2025 +0000 +++ b/COBRAxy/flux_simulation_beta.py Tue Sep 09 09:08:17 2025 +0000 @@ -9,7 +9,7 @@ from joblib import Parallel, delayed, cpu_count from cobra.sampling import OptGPSampler import sys -import utils.general_utils as utils +import utils.model_utils as model_utils ################################# process args ############################### @@ -29,6 +29,12 @@ parser.add_argument("-mo", "--model_upload", type = str, help = "path to input file with custom rules, if provided") + parser.add_argument("-mab", "--model_and_bounds", type = str, + choices = ['True', 'False'], + required = True, + help = "upload mode: True for model+bounds, False for complete models") + + parser.add_argument('-ol', '--out_log', help = "Output log") @@ -38,11 +44,11 @@ help = 'your tool directory') parser.add_argument('-in', '--input', - required = True, - type=str, - help = 'inputs bounds') + required = True, + type=str, + help = 'input bounds files or complete model files') - parser.add_argument('-ni', '--names', + parser.add_argument('-ni', '--name', required = True, type=str, help = 'cell names') @@ -215,9 +221,10 @@ pass -def model_sampler(model_input_original:cobra.Model, bounds_path:str, cell_name:str)-> List[pd.DataFrame]: + +def model_sampler_with_bounds(model_input_original: cobra.Model, bounds_path: str, cell_name: str) -> List[pd.DataFrame]: """ - Prepares the model with bounds from the dataset and performs sampling and analysis based on the selected algorithm. + MODE 1: Prepares the model with bounds from separate bounds file and performs sampling. Args: model_input_original (cobra.Model): The original COBRA model. @@ -230,26 +237,41 @@ model_input = model_input_original.copy() bounds_df = read_dataset(bounds_path, "bounds dataset") + + # Apply bounds to model for rxn_index, row in bounds_df.iterrows(): - model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound - model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound + try: + model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound + model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound + except KeyError: + warning(f"Warning: Reaction {rxn_index} not found in model. Skipping.") + return perform_sampling_and_analysis(model_input, cell_name) + + +def perform_sampling_and_analysis(model_input: cobra.Model, cell_name: str) -> List[pd.DataFrame]: + """ + Common function to perform sampling and analysis on a prepared model. + + Args: + model_input (cobra.Model): The prepared COBRA model with bounds applied. + cell_name (str): Name of the cell, used to generate filenames for output. + + Returns: + List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. + """ if ARGS.algorithm == 'OPTGP': OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) - elif ARGS.algorithm == 'CBS': - CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) + CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types) if("fluxes" not in ARGS.output_types): - os.remove(ARGS.output_path + "/" + cell_name + '.csv') + os.remove(ARGS.output_path + "/" + cell_name + '.csv') - returnList = [] - returnList.append(df_mean) - returnList.append(df_median) - returnList.append(df_quantiles) + returnList = [df_mean, df_median, df_quantiles] df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis) @@ -332,7 +354,7 @@ model.objective = "Biomass" solution = cobra.flux_analysis.pfba(model) fluxes = solution.fluxes - df_pFBA.loc[0,[rxn._id for rxn in model.reactions]] = fluxes.tolist() + df_pFBA.loc[0,[rxn.id for rxn in model.reactions]] = fluxes.tolist() df_pFBA = df_pFBA.reset_index(drop=True) df_pFBA.index = [model_name] df_pFBA = df_pFBA.astype(float).round(6) @@ -371,38 +393,63 @@ None """ - num_processors = cpu_count() + num_processors = max(1, cpu_count() - 1) global ARGS ARGS = process_args(args) if not os.path.exists(ARGS.output_path): os.makedirs(ARGS.output_path) + + #ARGS.bounds = ARGS.input.split(",") + #ARGS.bounds_name = ARGS.name.split(",") + #ARGS.output_types = ARGS.output_type.split(",") + #ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") + + # --- Normalize inputs (the tool may pass comma-separated --input and either --name or --names) --- + ARGS.input_files = ARGS.input.split(",") if getattr(ARGS, "input", None) else [] + ARGS.file_names = ARGS.name.split(",") + # output types (required) -> list + ARGS.output_types = ARGS.output_type.split(",") if getattr(ARGS, "output_type", None) else [] + # optional analysis output types -> list or empty + ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") if getattr(ARGS, "output_type_analysis", None) else [] + - #model_type :utils.Model = ARGS.model_selector - #if model_type is utils.Model.Custom: - # model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) - #else: - # model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) + if ARGS.model_and_bounds == "True": + # MODE 1: Model + bounds (separate files) + print("=== MODE 1: Model + Bounds (separate files) ===") + + # Load base model + if not ARGS.model_upload: + sys.exit("Error: model_upload is required for Mode 1") - model = utils.build_cobra_model_from_csv(ARGS.model_upload) - - validation = utils.validate_model(model) + base_model = model_utils.build_cobra_model_from_csv(ARGS.model_upload) - print("\n=== VALIDAZIONE MODELLO ===") - for key, value in validation.items(): - print(f"{key}: {value}") + validation = model_utils.validate_model(base_model) + + print("\n=== VALIDAZIONE MODELLO ===") + for key, value in validation.items(): + print(f"{key}: {value}") + + #Set solver verbosity to 1 to see warning and error messages only. + base_model.solver.configuration.verbosity = 1 - #Set solver verbosity to 1 to see warning and error messages only. - model.solver.configuration.verbosity = 1 - - ARGS.bounds = ARGS.input.split(",") - ARGS.bounds_name = ARGS.names.split(",") - ARGS.output_types = ARGS.output_type.split(",") - ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") + # Process each bounds file with the base model + results = Parallel(n_jobs=num_processors)( + delayed(model_sampler_with_bounds)(base_model, bounds_file, cell_name) + for bounds_file, cell_name in zip(ARGS.input_files, ARGS.file_names) + ) + else: + # MODE 2: Multiple complete models + print("=== MODE 2: Multiple complete models ===") + + # Process each complete model file + results = Parallel(n_jobs=num_processors)( + delayed(perform_sampling_and_analysis)(model_utils.build_cobra_model_from_csv(model_file), cell_name) + for model_file, cell_name in zip(ARGS.input_files, ARGS.file_names) + ) - results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model, bounds_path, cell_name) for bounds_path, cell_name in zip(ARGS.bounds, ARGS.bounds_name)) all_mean = pd.concat([result[0] for result in results], ignore_index=False) all_median = pd.concat([result[1] for result in results], ignore_index=False)
--- a/COBRAxy/flux_simulation_beta.xml Tue Sep 09 07:36:30 2025 +0000 +++ b/COBRAxy/flux_simulation_beta.xml Tue Sep 09 09:08:17 2025 +0000 @@ -4,29 +4,41 @@ <import>marea_macros.xml</import> </macros> - <requirements> + <requirements> <requirement type="package" version="1.24.4">numpy</requirement> <requirement type="package" version="2.0.3">pandas</requirement> - <requirement type="package" version="0.29.0">cobra</requirement> + <requirement type="package" version="0.29.0">cobra</requirement> <requirement type="package" version="5.2.2">lxml</requirement> <requirement type="package" version="1.4.2">joblib</requirement> <requirement type="package" version="1.11">scipy</requirement> - </requirements> + </requirements> <command detect_errors="exit_code"> <![CDATA[ python $__tool_directory__/flux_simulation_beta.py --tool_dir $__tool_directory__ - --model_upload $model_upload - --input "${",".join(map(str, $inputs))}" - #set $names = "" - #for $input_temp in $inputs: - #set $names = $names + $input_temp.element_identifier + "," - #end for - --name $names + --model_and_bounds $model_and_bounds.model_and_bounds + + #if $model_and_bounds.model_and_bounds == 'True': + --model_upload $model_and_bounds.model_upload + --input "${",".join(map(str, $model_and_bounds.inputs))}" + #set $names = "" + #for $input_temp in $model_and_bounds.inputs: + #set $names = $names + $input_temp.element_identifier + "," + #end for + --name $names + #else: + --input "${",".join(map(str, $model_and_bounds.model_files))}" + #set $names = "" + #for $input_temp in $model_and_bounds.model_files: + #set $names = $names + $input_temp.element_identifier + "," + #end for + --name $names + #end if + --thinning 0 #if $algorithm_param.algorithm == 'OPTGP': - --thinning $algorithm_param.thinning + --thinning $algorithm_param.thinning #end if --algorithm $algorithm_param.algorithm --n_batches $n_batches @@ -37,31 +49,44 @@ --out_log $log ]]> </command> + <inputs> + <conditional name="model_and_bounds"> + <param name="model_and_bounds" argument="--model_and_bounds" type="select" label="Upload mode:" help="Choose whether to upload the model and bounds in separate files or to upload multiple complete model files."> + <option value="True" selected="true">Model + bounds (separate files)</option> + <option value="False">Multiple complete models</option> + </param> + + <when value="True"> + <param name="model_upload" argument="--model_upload" type="data" format="csv,tsv,tabular" + label="Model (rules) file:" + help="Upload a CSV/TSV file that contains the model reaction rules. Recommended columns: ReactionID, Reaction (formula), Rule (GPR). Optional columns: name, lower_bound, upper_bound, InMedium. If bounds are present here they may be overridden by separate bound files." /> - <param name="model_upload" argument="--model_upload" type="data" format="csv,tsv,tabular" - label="Model rules file:" help="Upload a CSV/TSV file containing reaction rules generated by the Model Initialization tool." /> + <param name="inputs" argument="--inputs" multiple="true" type="data" format="tabular,csv,tsv" + label="Bound file(s):" + help="Upload one or more CSV/TSV files containing reaction bounds. Each file must include at least: ReactionID, lower_bound, upper_bound. Files are applied in the order provided; later files override earlier ones for the same ReactionID." /> + </when> - <param name="inputs" argument="--inputs" multiple="true" type="data" format="tabular, csv, tsv" label="Bound(s):" /> - - + <when value="False"> + <param name="model_files" argument="--model_files" multiple="true" type="data" format="csv,tsv,tabular" + label="Complete model files:" + help="Upload one or more CSV/TSV files, each containing both model rules and reaction bounds for different contexts/cells. Required columns: ReactionID, Reaction, Rule, lower_bound, upper_bound." /> + </when> + </conditional> + <conditional name="algorithm_param"> - <param name="algorithm" argument="--algorithm" type="select" label="Choose sampling algorithm:"> - <option value="CBS" selected="true">CBS</option> - <option value="OPTGP">OPTGP</option> - </param> - <when value="OPTGP"> - <param name="thinning" argument="--thinning" type="integer" label="Thinning:" value="100" help="Number of iterations to wait before taking a sample."/> - </when> - - </conditional> - + <param name="algorithm" argument="--algorithm" type="select" label="Choose sampling algorithm:"> + <option value="CBS" selected="true">CBS</option> + <option value="OPTGP">OPTGP</option> + </param> + <when value="OPTGP"> + <param name="thinning" argument="--thinning" type="integer" label="Thinning:" value="100" help="Number of iterations to wait before taking a sample."/> + </when> + </conditional> <param name="n_samples" argument="--n_samples" type="integer" label="Samples:" value="1000"/> - - <param name="n_batches" argument="--n_batches" type="integer" label="Batches:" value="1" help="This is useful for computational perfomances."/> - - <param name="seed" argument="--seed" type="integer" label="Seed:" value="0" helph="Random seed."/> + <param name="n_batches" argument="--n_batches" type="integer" label="Batches:" value="1" help="This is useful for computational performances."/> + <param name="seed" argument="--seed" type="integer" label="Seed:" value="0" help="Random seed."/> <param type="select" argument="--output_types" multiple="true" name="output_types" label="Desired outputs from sampling"> <option value="mean" selected="true">Mean</option> @@ -77,15 +102,11 @@ </param> </inputs> - <outputs> <data format="txt" name="log" label="Flux Simulation - Log" /> - <data name="output" format="tabular" label="Flux Simulation - Output"> - <discover_datasets pattern="__name_and_ext__" - directory="flux_simulation" visible="true" /> + <discover_datasets pattern="__name_and_ext__" directory="flux_simulation" visible="true" /> </data> - </outputs> <help> @@ -93,21 +114,21 @@ What it does ------------- -This tool generates flux samples starting from a model in JSON or XML format by using CBS (Corner-based sampling) or OPTGP (Improved Artificial Centering Hit-and-Run sampler) sampling algorithms. +This tool generates flux samples starting from metabolic models using CBS (Corner-based sampling) or OPTGP (Improved Artificial Centering Hit-and-Run sampler) algorithms. -It can return sampled fluxes by appliying summary statistics: +Two upload modes are supported: +1. **Model + bounds**: Upload one base model and multiple bound files (one per context/cell type) +2. **Multiple complete models**: Upload multiple complete model files, each with integrated bounds + +It can return sampled fluxes by applying summary statistics: - mean - median - - quantiles (0.25, 0.50, 0.75). + - quantiles (0.25, 0.50, 0.75) -Flux analysis can be perfomed over the metabolic model: - - parsimoniuos-FBA (optimized by Biomass) +Flux analysis can be performed over the metabolic model: + - parsimonious-FBA (optimized by Biomass) - FVA - - Biomass sensitivity analysis (single reaction knock-out). It is the ratio between the optimal of the Biomass reaction computed by FBA after knocking-out a reaction and the same over the complete model. - -Accepted files: - - A model: JSON, XML, MAT or YAML (.yml) file reporting reactions and rules contained in the model. Supported compressed formats: .zip, .gz and .bz2. Filename must follow the pattern: {model_name}.{extension}.[zip|gz|bz2] - - Context-specific bounds: generated by RAS to Bounds tool. This can be a collection of bounds too (one bounds file per context). + - Biomass sensitivity analysis (single reaction knock-out) Output: ------------- @@ -116,11 +137,9 @@ - Samples: reporting the sampled fluxes for each reaction (reaction names on the rows and sample names on the columns). Format: tab-separated. - a log file (.txt). -**TIP**: The Batches parameter is useful to mantain in memory just a batch of samples at time. For example, if you wish to sample 10.000 points, than it is suggested to select n_samples = 1.000 and n_batches=10. -**TIP**: The Thinning parameter of the OPTGP algorithm is useful to converge to a stationary distribution (see cited articles by Galuzzi, Milazzo and Damiani). - +**TIP**: The Batches parameter helps maintain memory efficiency. For 10,000 samples, use n_samples=1,000 and n_batches=10. +**TIP**: The Thinning parameter for OPTGP helps converge to stationary distribution. ]]> </help> <expand macro="citations_fluxes" /> - </tool> \ No newline at end of file
--- a/COBRAxy/utils/general_utils.py Tue Sep 09 07:36:30 2025 +0000 +++ b/COBRAxy/utils/general_utils.py Tue Sep 09 09:08:17 2025 +0000 @@ -704,275 +704,3 @@ def __str__(self) -> str: return self.value -def convert_genes(model,annotation): - from cobra.manipulation import rename_genes - model2=model.copy() - try: - dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} - except: - print("No annotation in gene dict!") - return -1 - rename_genes(model2,dict_genes) - - return model2 - - -def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobra.Model: - """ - Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. - - Args: - csv_path: Path al file CSV (separato da tab) - model_id: ID del modello da creare - - Returns: - cobra.Model: Il modello COBRApy costruito - """ - - # Leggi i dati dal CSV - df = pd.read_csv(csv_path, sep='\t') - - # Crea il modello vuoto - model = cobraModel(model_id) - - # Dict per tenere traccia di metaboliti e compartimenti - metabolites_dict = {} - compartments_dict = {} - - print(f"Costruendo modello da {len(df)} reazioni...") - - # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni - for idx, row in df.iterrows(): - reaction_formula = str(row['Reaction']).strip() - if not reaction_formula or reaction_formula == 'nan': - continue - - # Estrai metaboliti dalla formula della reazione - metabolites = extract_metabolites_from_reaction(reaction_formula) - - for met_id in metabolites: - compartment = extract_compartment_from_metabolite(met_id) - - # Aggiungi compartimento se non esiste - if compartment not in compartments_dict: - compartments_dict[compartment] = compartment - - # Aggiungi metabolita se non esiste - if met_id not in metabolites_dict: - metabolites_dict[met_id] = Metabolite( - id=met_id, - compartment=compartment, - name=met_id.replace(f"_{compartment}", "").replace("__", "_") - ) - - # Aggiungi compartimenti al modello - model.compartments = compartments_dict - - # Aggiungi metaboliti al modello - model.add_metabolites(list(metabolites_dict.values())) - - print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") - - # Seconda passata: aggiungi le reazioni - reactions_added = 0 - reactions_skipped = 0 - - for idx, row in df.iterrows(): - - reaction_id = str(row['ReactionID']).strip() - reaction_formula = str(row['Reaction']).strip() - - # Salta reazioni senza formula - if not reaction_formula or reaction_formula == 'nan': - raise ValueError(f"Formula della reazione mancante {reaction_id}") - - # Crea la reazione - reaction = Reaction(reaction_id) - reaction.name = reaction_id - - # Imposta bounds - reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 - reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 - - # Aggiungi gene rule se presente - if pd.notna(row['Rule']) and str(row['Rule']).strip(): - reaction.gene_reaction_rule = str(row['Rule']).strip() - - # Parse della formula della reazione - try: - parse_reaction_formula(reaction, reaction_formula, metabolites_dict) - except Exception as e: - print(f"Errore nel parsing della reazione {reaction_id}: {e}") - reactions_skipped += 1 - continue - - # Aggiungi la reazione al modello - model.add_reactions([reaction]) - reactions_added += 1 - - - print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") - - # Imposta l'obiettivo di biomassa - set_biomass_objective(model) - - # Imposta il medium - set_medium_from_data(model, df) - - print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") - - return model - - -# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) -def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: - """ - Estrae gli ID dei metaboliti da una formula di reazione. - Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) - e permette che comincino con cifre o underscore. - """ - metabolites = set() - # coefficiente opzionale seguito da un token che termina con _<letters> - pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' - matches = re.findall(pattern, reaction_formula) - metabolites.update(matches) - return metabolites - - -def extract_compartment_from_metabolite(metabolite_id: str) -> str: - """ - Estrae il compartimento dall'ID del metabolita. - """ - # Il compartimento è solitamente l'ultima lettera dopo l'underscore - if '_' in metabolite_id: - return metabolite_id.split('_')[-1] - return 'c' # default cytoplasm - - -def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): - """ - Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. - """ - - if reaction.id == 'EX_thbpt_e': - print(reaction.id) - print(formula) - # Dividi in parte sinistra e destra - if '<=>' in formula: - left, right = formula.split('<=>') - reversible = True - elif '<--' in formula: - left, right = formula.split('<--') - reversible = False - left, right = left, right - elif '-->' in formula: - left, right = formula.split('-->') - reversible = False - elif '<-' in formula: - left, right = formula.split('<-') - reversible = False - left, right = left, right - else: - raise ValueError(f"Formato reazione non riconosciuto: {formula}") - - # Parse dei metaboliti e coefficienti - reactants = parse_metabolites_side(left.strip()) - products = parse_metabolites_side(right.strip()) - - # Aggiungi metaboliti alla reazione - metabolites_to_add = {} - - # Reagenti (coefficienti negativi) - for met_id, coeff in reactants.items(): - if met_id in metabolites_dict: - metabolites_to_add[metabolites_dict[met_id]] = -coeff - - # Prodotti (coefficienti positivi) - for met_id, coeff in products.items(): - if met_id in metabolites_dict: - metabolites_to_add[metabolites_dict[met_id]] = coeff - - reaction.add_metabolites(metabolites_to_add) - - -def parse_metabolites_side(side_str: str) -> Dict[str, float]: - """ - Parsa un lato della reazione per estrarre metaboliti e coefficienti. - """ - metabolites = {} - if not side_str or side_str.strip() == '': - return metabolites - - terms = side_str.split('+') - for term in terms: - term = term.strip() - if not term: - continue - - # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> - match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) - if match: - coeff_str, met_id = match.groups() - coeff = float(coeff_str) if coeff_str else 1.0 - metabolites[met_id] = coeff - - return metabolites - - - -def set_biomass_objective(model: Model): - """ - Imposta la reazione di biomassa come obiettivo. - """ - biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] - - if biomass_reactions: - model.objective = biomass_reactions[0].id - print(f"Obiettivo impostato su: {biomass_reactions[0].id}") - else: - print("Nessuna reazione di biomassa trovata") - - -def set_medium_from_data(model: Model, df: pd.DataFrame): - """ - Imposta il medium basato sulla colonna InMedium. - """ - medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() - - medium_dict = {} - for rxn_id in medium_reactions: - if rxn_id in [r.id for r in model.reactions]: - reaction = model.reactions.get_by_id(rxn_id) - if reaction.lower_bound < 0: # Solo reazioni di uptake - medium_dict[rxn_id] = abs(reaction.lower_bound) - - if medium_dict: - model.medium = medium_dict - print(f"Medium impostato con {len(medium_dict)} componenti") - - -def validate_model(model: Model) -> Dict[str, any]: - """ - Valida il modello e fornisce statistiche di base. - """ - validation = { - 'num_reactions': len(model.reactions), - 'num_metabolites': len(model.metabolites), - 'num_genes': len(model.genes), - 'num_compartments': len(model.compartments), - 'objective': str(model.objective), - 'medium_size': len(model.medium), - 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), - 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), - } - - try: - # Test di crescita - solution = model.optimize() - validation['growth_rate'] = solution.objective_value - validation['status'] = solution.status - except Exception as e: - validation['growth_rate'] = None - validation['status'] = f"Error: {e}" - - return validation
--- a/COBRAxy/utils/model_utils.py Tue Sep 09 07:36:30 2025 +0000 +++ b/COBRAxy/utils/model_utils.py Tue Sep 09 09:08:17 2025 +0000 @@ -4,13 +4,16 @@ import pickle import argparse import pandas as pd -from typing import Optional, Tuple, Union, List, Dict +import re +from typing import Optional, Tuple, Union, List, Dict, Set import utils.general_utils as utils import utils.rule_parsing as rulesUtils +import utils.reaction_parsing as reactionUtils +from cobra import Model as cobraModel, Reaction, Metabolite ################################- DATA GENERATION -################################ ReactionId = str -def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: +def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: """ Generates a dictionary mapping reaction ids to rules from the model. @@ -34,7 +37,7 @@ for reaction in model.reactions if reaction.gene_reaction_rule } -def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: +def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: """ Generates a dictionary mapping reaction ids to reaction formulas from the model. @@ -56,7 +59,7 @@ return reactionUtils.create_reaction_dict(unparsedReactions) -def get_medium(model:cobra.Model) -> pd.DataFrame: +def get_medium(model:cobraModel) -> pd.DataFrame: trueMedium=[] for r in model.reactions: positiveCoeff=0 @@ -70,7 +73,7 @@ df_medium["reaction"] = trueMedium return df_medium -def generate_bounds(model:cobra.Model) -> pd.DataFrame: +def generate_bounds(model:cobraModel) -> pd.DataFrame: rxns = [] for reaction in model.reactions: @@ -84,7 +87,7 @@ -def generate_compartments(model: cobra.Model) -> pd.DataFrame: +def generate_compartments(model: cobraModel) -> pd.DataFrame: """ Generates a DataFrame containing compartment information for each reaction. Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) @@ -126,4 +129,278 @@ pathway_data.append(row) - return pd.DataFrame(pathway_data) \ No newline at end of file + return pd.DataFrame(pathway_data) + + + +def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: + """ + Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. + + Args: + csv_path: Path al file CSV (separato da tab) + model_id: ID del modello da creare + + Returns: + cobra.Model: Il modello COBRApy costruito + """ + + # Leggi i dati dal CSV + df = pd.read_csv(csv_path, sep='\t') + + # Crea il modello vuoto + model = cobraModel(model_id) + + # Dict per tenere traccia di metaboliti e compartimenti + metabolites_dict = {} + compartments_dict = {} + + print(f"Costruendo modello da {len(df)} reazioni...") + + # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni + for idx, row in df.iterrows(): + reaction_formula = str(row['Reaction']).strip() + if not reaction_formula or reaction_formula == 'nan': + continue + + # Estrai metaboliti dalla formula della reazione + metabolites = extract_metabolites_from_reaction(reaction_formula) + + for met_id in metabolites: + compartment = extract_compartment_from_metabolite(met_id) + + # Aggiungi compartimento se non esiste + if compartment not in compartments_dict: + compartments_dict[compartment] = compartment + + # Aggiungi metabolita se non esiste + if met_id not in metabolites_dict: + metabolites_dict[met_id] = Metabolite( + id=met_id, + compartment=compartment, + name=met_id.replace(f"_{compartment}", "").replace("__", "_") + ) + + # Aggiungi compartimenti al modello + model.compartments = compartments_dict + + # Aggiungi metaboliti al modello + model.add_metabolites(list(metabolites_dict.values())) + + print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") + + # Seconda passata: aggiungi le reazioni + reactions_added = 0 + reactions_skipped = 0 + + for idx, row in df.iterrows(): + + reaction_id = str(row['ReactionID']).strip() + reaction_formula = str(row['Reaction']).strip() + + # Salta reazioni senza formula + if not reaction_formula or reaction_formula == 'nan': + raise ValueError(f"Formula della reazione mancante {reaction_id}") + + # Crea la reazione + reaction = Reaction(reaction_id) + reaction.name = reaction_id + + # Imposta bounds + reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 + reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 + + # Aggiungi gene rule se presente + if pd.notna(row['Rule']) and str(row['Rule']).strip(): + reaction.gene_reaction_rule = str(row['Rule']).strip() + + # Parse della formula della reazione + try: + parse_reaction_formula(reaction, reaction_formula, metabolites_dict) + except Exception as e: + print(f"Errore nel parsing della reazione {reaction_id}: {e}") + reactions_skipped += 1 + continue + + # Aggiungi la reazione al modello + model.add_reactions([reaction]) + reactions_added += 1 + + + print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") + + # Imposta l'obiettivo di biomassa + set_biomass_objective(model) + + # Imposta il medium + set_medium_from_data(model, df) + + print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") + + return model + + +# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) +def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: + """ + Estrae gli ID dei metaboliti da una formula di reazione. + Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) + e permette che comincino con cifre o underscore. + """ + metabolites = set() + # coefficiente opzionale seguito da un token che termina con _<letters> + pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' + matches = re.findall(pattern, reaction_formula) + metabolites.update(matches) + return metabolites + + +def extract_compartment_from_metabolite(metabolite_id: str) -> str: + """ + Estrae il compartimento dall'ID del metabolita. + """ + # Il compartimento è solitamente l'ultima lettera dopo l'underscore + if '_' in metabolite_id: + return metabolite_id.split('_')[-1] + return 'c' # default cytoplasm + + +def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): + """ + Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. + """ + + if reaction.id == 'EX_thbpt_e': + print(reaction.id) + print(formula) + # Dividi in parte sinistra e destra + if '<=>' in formula: + left, right = formula.split('<=>') + reversible = True + elif '<--' in formula: + left, right = formula.split('<--') + reversible = False + left, right = left, right + elif '-->' in formula: + left, right = formula.split('-->') + reversible = False + elif '<-' in formula: + left, right = formula.split('<-') + reversible = False + left, right = left, right + else: + raise ValueError(f"Formato reazione non riconosciuto: {formula}") + + # Parse dei metaboliti e coefficienti + reactants = parse_metabolites_side(left.strip()) + products = parse_metabolites_side(right.strip()) + + # Aggiungi metaboliti alla reazione + metabolites_to_add = {} + + # Reagenti (coefficienti negativi) + for met_id, coeff in reactants.items(): + if met_id in metabolites_dict: + metabolites_to_add[metabolites_dict[met_id]] = -coeff + + # Prodotti (coefficienti positivi) + for met_id, coeff in products.items(): + if met_id in metabolites_dict: + metabolites_to_add[metabolites_dict[met_id]] = coeff + + reaction.add_metabolites(metabolites_to_add) + + +def parse_metabolites_side(side_str: str) -> Dict[str, float]: + """ + Parsa un lato della reazione per estrarre metaboliti e coefficienti. + """ + metabolites = {} + if not side_str or side_str.strip() == '': + return metabolites + + terms = side_str.split('+') + for term in terms: + term = term.strip() + if not term: + continue + + # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> + match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) + if match: + coeff_str, met_id = match.groups() + coeff = float(coeff_str) if coeff_str else 1.0 + metabolites[met_id] = coeff + + return metabolites + + + +def set_biomass_objective(model: cobraModel): + """ + Imposta la reazione di biomassa come obiettivo. + """ + biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] + + if biomass_reactions: + model.objective = biomass_reactions[0].id + print(f"Obiettivo impostato su: {biomass_reactions[0].id}") + else: + print("Nessuna reazione di biomassa trovata") + + +def set_medium_from_data(model: cobraModel, df: pd.DataFrame): + """ + Imposta il medium basato sulla colonna InMedium. + """ + medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() + + medium_dict = {} + for rxn_id in medium_reactions: + if rxn_id in [r.id for r in model.reactions]: + reaction = model.reactions.get_by_id(rxn_id) + if reaction.lower_bound < 0: # Solo reazioni di uptake + medium_dict[rxn_id] = abs(reaction.lower_bound) + + if medium_dict: + model.medium = medium_dict + print(f"Medium impostato con {len(medium_dict)} componenti") + + +def validate_model(model: cobraModel) -> Dict[str, any]: + """ + Valida il modello e fornisce statistiche di base. + """ + validation = { + 'num_reactions': len(model.reactions), + 'num_metabolites': len(model.metabolites), + 'num_genes': len(model.genes), + 'num_compartments': len(model.compartments), + 'objective': str(model.objective), + 'medium_size': len(model.medium), + 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), + 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), + } + + try: + # Test di crescita + solution = model.optimize() + validation['growth_rate'] = solution.objective_value + validation['status'] = solution.status + except Exception as e: + validation['growth_rate'] = None + validation['status'] = f"Error: {e}" + + return validation + +def convert_genes(model,annotation): + from cobra.manipulation import rename_genes + model2=model.copy() + try: + dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} + except: + print("No annotation in gene dict!") + return -1 + rename_genes(model2,dict_genes) + + return model2 \ No newline at end of file