diff COBRAxy/ras_to_bounds.py @ 489:97eea560a10f draft

Uploaded
author francesco_lapi
date Mon, 29 Sep 2025 10:33:26 +0000
parents 1e7a8da6c47a
children ffc234ec80db
line wrap: on
line diff
--- a/COBRAxy/ras_to_bounds.py	Tue Sep 23 13:48:24 2025 +0000
+++ b/COBRAxy/ras_to_bounds.py	Mon Sep 29 10:33:26 2025 +0000
@@ -1,13 +1,24 @@
+"""
+Apply RAS-based scaling to reaction bounds and optionally save updated models.
+
+Workflow:
+- Read one or more RAS matrices (patients/samples x reactions)
+- Normalize and merge them, optionally adding class suffixes to sample IDs
+- Build a COBRA model from a tabular CSV
+- Run FVA to initialize bounds, then scale per-sample based on RAS values
+- Save bounds per sample and optionally export updated models in chosen formats
+"""
 import argparse
 import utils.general_utils as utils
-from typing import Optional, List
+from typing import Optional, Dict, Set, List, Tuple, Union
 import os
 import numpy as np
 import pandas as pd
 import cobra
+from cobra import Model
 import sys
-import csv
 from joblib import Parallel, delayed, cpu_count
+import utils.model_utils as modelUtils
 
 ################################# process args ###############################
 def process_args(args :List[str] = None) -> argparse.Namespace:
@@ -23,23 +34,9 @@
     parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
                                      description = 'process some value\'s')
     
-    parser.add_argument(
-        '-ms', '--model_selector', 
-        type = utils.Model, default = utils.Model.ENGRO2, choices = [utils.Model.ENGRO2, utils.Model.Custom],
-        help = 'chose which type of model you want use')
     
-    parser.add_argument("-mo", "--model", type = str,
+    parser.add_argument("-mo", "--model_upload", type = str,
         help = "path to input file with custom rules, if provided")
-    
-    parser.add_argument("-mn", "--model_name", type = str, help = "custom mode name")
-
-    parser.add_argument(
-        '-mes', '--medium_selector', 
-        default = "allOpen",
-        help = 'chose which type of medium you want use')
-    
-    parser.add_argument("-meo", "--medium", type = str,
-        help = "path to input file with custom medium, if provided")
 
     parser.add_argument('-ol', '--out_log', 
                         help = "Output log")
@@ -57,11 +54,6 @@
     parser.add_argument('-rn', '--name',
                 type=str,
                 help = 'ras class names')
-    
-    parser.add_argument('-rs', '--ras_selector',
-                        required = True,
-                        type=utils.Bool("using_RAS"),
-                        help = 'ras selector')
 
     parser.add_argument('-cc', '--cell_class',
                     type = str,
@@ -72,6 +64,21 @@
         default='ras_to_bounds/',
         help = 'output path for maps')
     
+    parser.add_argument('-sm', '--save_models',
+                    type=utils.Bool("save_models"),
+                    default=False,
+                    help = 'whether to save models with applied bounds')
+    
+    parser.add_argument('-smp', '--save_models_path',
+                        type = str,
+                        default='saved_models/',
+                        help = 'output path for saved models')
+    
+    parser.add_argument('-smf', '--save_models_format',
+                        type = str,
+                        default='csv',
+                        help = 'format for saved models (csv, xml, json, mat, yaml, tabular)')
+
     
     ARGS = parser.parse_args(args)
     return ARGS
@@ -87,8 +94,9 @@
     Returns:
       None
     """
-    with open(ARGS.out_log, 'a') as log:
-        log.write(s + "\n\n")
+    if ARGS.out_log:
+        with open(ARGS.out_log, 'a') as log:
+            log.write(s + "\n\n")
     print(s)
 
 ############################ dataset input ####################################
@@ -143,7 +151,100 @@
                 new_bounds.loc[reaction, "upper_bound"] = valMax
     return new_bounds
 
-def process_ras_cell(cellName, ras_row, model, rxns_ids, output_folder):
+
+def save_model(model, filename, output_folder, file_format='csv'):
+    """
+    Save a COBRA model to file in the specified format.
+    
+    Args:
+        model (cobra.Model): The model to save.
+        filename (str): Base filename (without extension).
+        output_folder (str): Output directory.
+        file_format (str): File format ('xml', 'json', 'mat', 'yaml', 'tabular', 'csv').
+    
+    Returns:
+        None
+    """
+    if not os.path.exists(output_folder):
+        os.makedirs(output_folder)
+    
+    try:
+        if file_format == 'tabular' or file_format == 'csv':
+            # Special handling for tabular format using utils functions
+            filepath = os.path.join(output_folder, f"{filename}.csv")
+            
+            rules = modelUtils.generate_rules(model, asParsed = False)
+            reactions = modelUtils.generate_reactions(model, asParsed = False)
+            bounds = modelUtils.generate_bounds(model)
+            medium = modelUtils.get_medium(model)
+            
+            try:
+                compartments = modelUtils.generate_compartments(model)
+            except:
+                compartments = None
+
+            df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"])
+            df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"])
+            df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
+            df_medium = medium.rename(columns = {"reaction": "ReactionID"})
+            df_medium["InMedium"] = True
+
+            merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
+            merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
+            
+            # Add compartments only if they exist and model name is ENGRO2
+            if compartments is not None and hasattr(ARGS, 'name') and ARGS.name == "ENGRO2": 
+                merged = merged.merge(compartments, on = "ReactionID", how = "outer")
+            
+            merged = merged.merge(df_medium, on = "ReactionID", how = "left")
+            merged["InMedium"] = merged["InMedium"].fillna(False)
+            merged = merged.sort_values(by = "InMedium", ascending = False)
+            
+            merged.to_csv(filepath, sep="\t", index=False)
+            
+        else:
+            # Standard COBRA formats
+            filepath = os.path.join(output_folder, f"{filename}.{file_format}")
+            
+            if file_format == 'xml':
+                cobra.io.write_sbml_model(model, filepath)
+            elif file_format == 'json':
+                cobra.io.save_json_model(model, filepath)
+            elif file_format == 'mat':
+                cobra.io.save_matlab_model(model, filepath)
+            elif file_format == 'yaml':
+                cobra.io.save_yaml_model(model, filepath)
+            else:
+                raise ValueError(f"Unsupported format: {file_format}")
+        
+        print(f"Model saved: {filepath}")
+        
+    except Exception as e:
+        warning(f"Error saving model {filename}: {str(e)}")
+
+def apply_bounds_to_model(model, bounds):
+    """
+    Apply bounds from a DataFrame to a COBRA model.
+    
+    Args:
+        model (cobra.Model): The metabolic model to modify.
+        bounds (pd.DataFrame): DataFrame with reaction bounds.
+    
+    Returns:
+        cobra.Model: Modified model with new bounds.
+    """
+    model_copy = model.copy()
+    for reaction_id in bounds.index:
+        try:
+            reaction = model_copy.reactions.get_by_id(reaction_id)
+            reaction.lower_bound = bounds.loc[reaction_id, "lower_bound"]
+            reaction.upper_bound = bounds.loc[reaction_id, "upper_bound"]
+        except KeyError:
+            # Reaction not found in model, skip
+            continue
+    return model_copy
+
+def process_ras_cell(cellName, ras_row, model, rxns_ids, output_folder, save_models=False, save_models_path='saved_models/', save_models_format='csv'):
     """
     Process a single RAS cell, apply bounds, and save the bounds to a CSV file.
 
@@ -153,6 +254,9 @@
         model (cobra.Model): The metabolic model to be modified.
         rxns_ids (list of str): List of reaction IDs to which the scaling factors will be applied.
         output_folder (str): Folder path where the output CSV file will be saved.
+        save_models (bool): Whether to save models with applied bounds.
+        save_models_path (str): Path where to save models.
+        save_models_format (str): Format for saved models.
     
     Returns:
         None
@@ -160,32 +264,30 @@
     bounds = pd.DataFrame([(rxn.lower_bound, rxn.upper_bound) for rxn in model.reactions], index=rxns_ids, columns=["lower_bound", "upper_bound"])
     new_bounds = apply_ras_bounds(bounds, ras_row)
     new_bounds.to_csv(output_folder + cellName + ".csv", sep='\t', index=True)
-    pass
+    
+    # Save model if requested
+    if save_models:
+        modified_model = apply_bounds_to_model(model, new_bounds)
+        save_model(modified_model, cellName, save_models_path, save_models_format)
+    
+    return
 
-def generate_bounds(model: cobra.Model, medium: dict, ras=None, output_folder='output/') -> pd.DataFrame:
+def generate_bounds_model(model: cobra.Model, ras=None, output_folder='output/', save_models=False, save_models_path='saved_models/', save_models_format='csv') -> pd.DataFrame:
     """
     Generate reaction bounds for a metabolic model based on medium conditions and optional RAS adjustments.
     
     Args:
         model (cobra.Model): The metabolic model for which bounds will be generated.
-        medium (dict): A dictionary where keys are reaction IDs and values are the medium conditions.
         ras (pd.DataFrame, optional): RAS pandas dataframe. Defaults to None.
         output_folder (str, optional): Folder path where output CSV files will be saved. Defaults to 'output/'.
+        save_models (bool): Whether to save models with applied bounds.
+        save_models_path (str): Path where to save models.
+        save_models_format (str): Format for saved models.
 
     Returns:
         pd.DataFrame: DataFrame containing the bounds of reactions in the model.
     """
-    rxns_ids = [rxn.id for rxn in model.reactions]
-
-    # Set all reactions to zero in the medium
-    for rxn_id, _ in model.medium.items():
-        model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
-    
-    # Set medium conditions
-    for reaction, value in medium.items():
-        if value is not None:
-            model.reactions.get_by_id(reaction).lower_bound = -float(value)
-            
+    rxns_ids = [rxn.id for rxn in model.reactions]            
             
     # Perform Flux Variability Analysis (FVA) on this medium
     df_FVA = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8)
@@ -196,19 +298,18 @@
         model.reactions.get_by_id(reaction).upper_bound = float(df_FVA.loc[reaction, "maximum"])
 
     if ras is not None:
-        Parallel(n_jobs=cpu_count())(delayed(process_ras_cell)(cellName, ras_row, model, rxns_ids, output_folder) for cellName, ras_row in ras.iterrows())
+        Parallel(n_jobs=cpu_count())(delayed(process_ras_cell)(
+            cellName, ras_row, model, rxns_ids, output_folder, 
+            save_models, save_models_path, save_models_format
+        ) for cellName, ras_row in ras.iterrows())
     else:
-        bounds = pd.DataFrame([(rxn.lower_bound, rxn.upper_bound) for rxn in model.reactions], index=rxns_ids, columns=["lower_bound", "upper_bound"])
-        newBounds = apply_ras_bounds(bounds, pd.Series([1]*len(rxns_ids), index=rxns_ids))
-        newBounds.to_csv(output_folder + "bounds.csv", sep='\t', index=True)
-    pass
-
-
+        raise ValueError("RAS DataFrame is None. Cannot generate bounds without RAS data.")
+    return
 
 ############################# main ###########################################
 def main(args:List[str] = None) -> None:
     """
-    Initializes everything and sets the program in motion based on the fronted input arguments.
+    Initialize and execute RAS-to-bounds pipeline based on the frontend input arguments.
 
     Returns:
         None
@@ -216,71 +317,60 @@
     if not os.path.exists('ras_to_bounds'):
         os.makedirs('ras_to_bounds')
 
-
     global ARGS
     ARGS = process_args(args)
 
-    if(ARGS.ras_selector == True):
-        ras_file_list = ARGS.input_ras.split(",")
-        ras_file_names = ARGS.name.split(",")
-        if len(ras_file_names) != len(set(ras_file_names)):
-            error_message = "Duplicated file names in the uploaded RAS matrices."
-            warning(error_message)
-            raise ValueError(error_message)
-            pass
-        ras_class_names = []
-        for file in ras_file_names:
-            ras_class_names.append(file.rsplit(".", 1)[0])
-        ras_list = []
-        class_assignments = pd.DataFrame(columns=["Patient_ID", "Class"])
-        for ras_matrix, ras_class_name in zip(ras_file_list, ras_class_names):
-            ras = read_dataset(ras_matrix, "ras dataset")
-            ras.replace("None", None, inplace=True)
-            ras.set_index("Reactions", drop=True, inplace=True)
-            ras = ras.T
-            ras = ras.astype(float)
-            if(len(ras_file_list)>1):
-                #append class name to patient id (dataframe index)
-                ras.index = [f"{idx}_{ras_class_name}" for idx in ras.index]
-            else:
-                ras.index = [f"{idx}" for idx in ras.index]
-            ras_list.append(ras)
-            for patient_id in ras.index:
-                class_assignments.loc[class_assignments.shape[0]] = [patient_id, ras_class_name]
+
+    ras_file_list = ARGS.input_ras.split(",")
+    ras_file_names = ARGS.name.split(",")
+    if len(ras_file_names) != len(set(ras_file_names)):
+        error_message = "Duplicated file names in the uploaded RAS matrices."
+        warning(error_message)
+        raise ValueError(error_message)
         
+    ras_class_names = []
+    for file in ras_file_names:
+        ras_class_names.append(file.rsplit(".", 1)[0])
+    ras_list = []
+    class_assignments = pd.DataFrame(columns=["Patient_ID", "Class"])
+    for ras_matrix, ras_class_name in zip(ras_file_list, ras_class_names):
+        ras = read_dataset(ras_matrix, "ras dataset")
+        ras.replace("None", None, inplace=True)
+        ras.set_index("Reactions", drop=True, inplace=True)
+        ras = ras.T
+        ras = ras.astype(float)
+        if(len(ras_file_list)>1):
+            # Append class name to patient id (DataFrame index)
+            ras.index = [f"{idx}_{ras_class_name}" for idx in ras.index]
+        else:
+            ras.index = [f"{idx}" for idx in ras.index]
+        ras_list.append(ras)
+        for patient_id in ras.index:
+            class_assignments.loc[class_assignments.shape[0]] = [patient_id, ras_class_name]
+    
         
-        # Concatenate all ras DataFrames into a single DataFrame
+    # Concatenate all RAS DataFrames into a single DataFrame
         ras_combined = pd.concat(ras_list, axis=0)
-        # Normalize the RAS values by max RAS
+    # Normalize RAS values column-wise by max RAS
         ras_combined = ras_combined.div(ras_combined.max(axis=0))
         ras_combined.dropna(axis=1, how='all', inplace=True)
 
+    model = modelUtils.build_cobra_model_from_csv(ARGS.model_upload)
 
-    
-    model_type :utils.Model = ARGS.model_selector
-    if model_type is utils.Model.Custom:
-        model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext)
-    else:
-        model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir)
+    validation = modelUtils.validate_model(model)
+
+    print("\n=== MODEL VALIDATION ===")
+    for key, value in validation.items():
+        print(f"{key}: {value}")
 
-    if(ARGS.medium_selector == "Custom"):
-        medium = read_dataset(ARGS.medium, "medium dataset")
-        medium.set_index(medium.columns[0], inplace=True)
-        medium = medium.astype(float)
-        medium = medium[medium.columns[0]].to_dict()
-    else:
-        df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
-        ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
-        medium = df_mediums[[ARGS.medium_selector]]
-        medium = medium[ARGS.medium_selector].to_dict()
 
-    if(ARGS.ras_selector == True):
-        generate_bounds(model, medium, ras = ras_combined, output_folder=ARGS.output_path)
-        class_assignments.to_csv(ARGS.cell_class, sep = '\t', index = False)
-    else:
-        generate_bounds(model, medium, output_folder=ARGS.output_path)
+    generate_bounds_model(model, ras=ras_combined, output_folder=ARGS.output_path,
+                    save_models=ARGS.save_models, save_models_path=ARGS.save_models_path,
+                    save_models_format=ARGS.save_models_format)
+    class_assignments.to_csv(ARGS.cell_class, sep='\t', index=False)
 
-    pass
+
+    return
         
 ##############################################################################
 if __name__ == "__main__":