diff COBRAxy/ras_to_bounds.py @ 86:f17137e8824a draft

Uploaded
author luca_milaz
date Sun, 13 Oct 2024 11:31:52 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/COBRAxy/ras_to_bounds.py	Sun Oct 13 11:31:52 2024 +0000
@@ -0,0 +1,276 @@
+import argparse
+import utils.general_utils as utils
+from typing import Optional, List
+import os
+import numpy as np
+import pandas as pd
+import cobra
+import sys
+import csv
+from joblib import Parallel, delayed, cpu_count
+
+################################# process args ###############################
+def process_args(args :List[str]) -> argparse.Namespace:
+    """
+    Processes command-line arguments.
+
+    Args:
+        args (list): List of command-line arguments.
+
+    Returns:
+        Namespace: An object containing parsed arguments.
+    """
+    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
+                                     description = 'process some value\'s')
+    
+    parser.add_argument(
+        '-ms', '--model_selector', 
+        type = utils.Model, default = utils.Model.ENGRO2, choices = [utils.Model.ENGRO2, utils.Model.Custom],
+        help = 'chose which type of model you want use')
+    
+    parser.add_argument("-mo", "--model", type = str,
+        help = "path to input file with custom rules, if provided")
+    
+    parser.add_argument("-mn", "--model_name", type = str, help = "custom mode name")
+
+    parser.add_argument(
+        '-mes', '--medium_selector', 
+        default = "allOpen",
+        help = 'chose which type of medium you want use')
+    
+    parser.add_argument("-meo", "--medium", type = str,
+        help = "path to input file with custom medium, if provided")
+
+    parser.add_argument('-ol', '--out_log', 
+                        help = "Output log")
+    
+    parser.add_argument('-td', '--tool_dir',
+                        type = str,
+                        required = True,
+                        help = 'your tool directory')
+    
+    parser.add_argument('-ir', '--input_ras',
+                        type=str,
+                        required = False,
+                        help = 'input ras')
+    
+    parser.add_argument('-rn', '--names',
+                type=str,
+                help = 'ras class names')
+    
+    parser.add_argument('-rs', '--ras_selector',
+                        required = True,
+                        type=utils.Bool("using_RAS"),
+                        help = 'ras selector')
+
+    parser.add_argument('-cc', '--cell_class',
+                    type = str,
+                    help = 'output of cell class')
+    
+    
+    ARGS = parser.parse_args()
+    return ARGS
+
+########################### warning ###########################################
+def warning(s :str) -> None:
+    """
+    Log a warning message to an output log file and print it to the console.
+
+    Args:
+        s (str): The warning message to be logged and printed.
+    
+    Returns:
+      None
+    """
+    with open(ARGS.out_log, 'a') as log:
+        log.write(s + "\n\n")
+    print(s)
+
+############################ dataset input ####################################
+def read_dataset(data :str, name :str) -> pd.DataFrame:
+    """
+    Read a dataset from a CSV file and return it as a pandas DataFrame.
+
+    Args:
+        data (str): Path to the CSV file containing the dataset.
+        name (str): Name of the dataset, used in error messages.
+
+    Returns:
+        pandas.DataFrame: DataFrame containing the dataset.
+
+    Raises:
+        pd.errors.EmptyDataError: If the CSV file is empty.
+        sys.exit: If the CSV file has the wrong format, the execution is aborted.
+    """
+    try:
+        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
+    except pd.errors.EmptyDataError:
+        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+    if len(dataset.columns) < 2:
+        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+    return dataset
+
+
+def apply_ras_bounds(model, ras_row, rxns_ids):
+    """
+    Adjust the bounds of reactions in the model based on RAS values.
+
+    Args:
+        model (cobra.Model): The metabolic model to be modified.
+        ras_row (pd.Series): A row from a RAS DataFrame containing scaling factors for reaction bounds.
+        rxns_ids (list of str): List of reaction IDs to which the scaling factors will be applied.
+    
+    Returns:
+        None
+    """
+    for reaction in rxns_ids:
+        if reaction in ras_row.index:
+            scaling_factor = ras_row[reaction]
+            lower_bound=model.reactions.get_by_id(reaction).lower_bound
+            upper_bound=model.reactions.get_by_id(reaction).upper_bound
+            #warning("Reaction: "+reaction+" Lower Bound: "+str(lower_bound)+" Upper Bound: "+str(upper_bound)+" Scaling Factor: "+str(scaling_factor))
+            valMax=float((upper_bound)*scaling_factor)
+            valMin=float((lower_bound)*scaling_factor)
+            if upper_bound!=0 and lower_bound==0:
+                model.reactions.get_by_id(reaction).upper_bound=valMax
+            if upper_bound==0 and lower_bound!=0:
+                model.reactions.get_by_id(reaction).lower_bound=valMin
+            if upper_bound!=0 and lower_bound!=0:
+                model.reactions.get_by_id(reaction).lower_bound=valMin
+                model.reactions.get_by_id(reaction).upper_bound=valMax
+    pass
+
+def process_ras_cell(cellName, ras_row, model, rxns_ids, output_folder):
+    """
+    Process a single RAS cell, apply bounds, and save the bounds to a CSV file.
+
+    Args:
+        cellName (str): The name of the RAS cell (used for naming the output file).
+        ras_row (pd.Series): A row from a RAS DataFrame containing scaling factors for reaction bounds.
+        model (cobra.Model): The metabolic model to be modified.
+        rxns_ids (list of str): List of reaction IDs to which the scaling factors will be applied.
+        output_folder (str): Folder path where the output CSV file will be saved.
+    
+    Returns:
+        None
+    """
+    model_new = model.copy()
+    apply_ras_bounds(model_new, ras_row, rxns_ids)
+    bounds = pd.DataFrame([(rxn.lower_bound, rxn.upper_bound) for rxn in model_new.reactions], index=rxns_ids, columns=["lower_bound", "upper_bound"])
+    bounds.to_csv(output_folder + cellName + ".csv", sep='\t', index=True)
+    pass
+
+def generate_bounds(model: cobra.Model, medium: dict, ras=None, output_folder='output/') -> pd.DataFrame:
+    """
+    Generate reaction bounds for a metabolic model based on medium conditions and optional RAS adjustments.
+    
+    Args:
+        model (cobra.Model): The metabolic model for which bounds will be generated.
+        medium (dict): A dictionary where keys are reaction IDs and values are the medium conditions.
+        ras (pd.DataFrame, optional): RAS pandas dataframe. Defaults to None.
+        output_folder (str, optional): Folder path where output CSV files will be saved. Defaults to 'output/'.
+
+    Returns:
+        pd.DataFrame: DataFrame containing the bounds of reactions in the model.
+    """
+    rxns_ids = [rxn.id for rxn in model.reactions]
+    
+    # Set medium conditions
+    for reaction, value in medium.items():
+        if value is not None:
+            model.reactions.get_by_id(reaction).lower_bound = -float(value)
+    
+    # Perform Flux Variability Analysis (FVA)
+    df_FVA = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8)
+    
+    # Set FVA bounds
+    for reaction in rxns_ids:
+        rxn = model.reactions.get_by_id(reaction)
+        rxn.lower_bound = float(df_FVA.loc[reaction, "minimum"])
+        rxn.upper_bound = float(df_FVA.loc[reaction, "maximum"])
+
+    if ras is not None:
+        Parallel(n_jobs=cpu_count())(delayed(process_ras_cell)(cellName, ras_row, model, rxns_ids, output_folder) for cellName, ras_row in ras.iterrows())
+         #for cellName, ras_row in ras.iterrows():
+            #process_ras_cell(cellName, ras_row, model, rxns_ids, output_folder) 
+    else:
+        model_new = model.copy()
+        apply_ras_bounds(model_new, pd.Series([1]*len(rxns_ids), index=rxns_ids), rxns_ids)
+        bounds = pd.DataFrame([(rxn.lower_bound, rxn.upper_bound) for rxn in model_new.reactions], index=rxns_ids, columns=["lower_bound", "upper_bound"])
+        bounds.to_csv(output_folder + "bounds.csv", sep='\t', index=True)
+    pass
+
+
+
+############################# main ###########################################
+def main() -> None:
+    """
+    Initializes everything and sets the program in motion based on the fronted input arguments.
+
+    Returns:
+        None
+    """
+    if not os.path.exists('ras_to_bounds'):
+        os.makedirs('ras_to_bounds')
+
+
+    global ARGS
+    ARGS = process_args(sys.argv)
+
+    ARGS.output_folder = 'ras_to_bounds/'
+
+    if(ARGS.ras_selector == True):
+        ras_file_list = ARGS.input_ras.split(",")
+        ras_file_names = ARGS.names.split(",")
+        ras_class_names = []
+        for file in ras_file_names:
+            ras_class_names.append(file.split(".")[0])
+        ras_list = []
+        class_assignments = pd.DataFrame(columns=["Patient_ID", "Class"])
+        for ras_matrix, ras_class_name in zip(ras_file_list, ras_class_names):
+            ras = read_dataset(ras_matrix, "ras dataset")
+            ras.replace("None", None, inplace=True)
+            ras.set_index("Reactions", drop=True, inplace=True)
+            ras = ras.T
+            ras = ras.astype(float)
+            ras_list.append(ras)
+            for patient_id in ras.index:
+                class_assignments.loc[class_assignments.shape[0]] = [patient_id, ras_class_name]
+        
+        
+        # Concatenate all ras DataFrames into a single DataFrame
+        ras_combined = pd.concat(ras_list, axis=0)
+        # Normalize the RAS values by max RAS
+        ras_combined = ras_combined.div(ras_combined.max(axis=0))
+        ras_combined = ras_combined.fillna(0)
+
+
+    
+    model_type :utils.Model = ARGS.model_selector
+    if model_type is utils.Model.Custom:
+        model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext)
+    else:
+        model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir)
+
+    if(ARGS.medium_selector == "Custom"):
+        medium = read_dataset(ARGS.medium, "medium dataset")
+        medium.set_index(medium.columns[0], inplace=True)
+        medium = medium.astype(float)
+        medium = medium[medium.columns[0]].to_dict()
+    else:
+        df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
+        ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
+        medium = df_mediums[[ARGS.medium_selector]]
+        medium = medium[ARGS.medium_selector].to_dict()
+
+    if(ARGS.ras_selector == True):
+        generate_bounds(model, medium, ras = ras_combined, output_folder=ARGS.output_folder)
+        class_assignments.to_csv(ARGS.cell_class, sep = '\t', index = False)
+    else:
+        generate_bounds(model, medium, output_folder=ARGS.output_folder)
+
+    pass
+        
+##############################################################################
+if __name__ == "__main__":
+    main()
\ No newline at end of file