Mercurial > repos > bimib > marea_2
changeset 152:c0cb72d92fd9 draft
Uploaded
| author | luca_milaz | 
|---|---|
| date | Mon, 22 Jul 2024 11:24:46 +0000 | 
| parents | d1417471af18 | 
| children | fc183a80dae3 | 
| files | marea_2/flux_simulation.py | 
| diffstat | 1 files changed, 270 insertions(+), 0 deletions(-) [+] | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/marea_2/flux_simulation.py Mon Jul 22 11:24:46 2024 +0000 @@ -0,0 +1,270 @@ +import argparse +import utils.general_utils as utils +from typing import Optional, List +import os +import numpy as np +import pandas as pd +import cobra +import utils.CBS_backend as CBS_backend +from joblib import Parallel, delayed, cpu_count +from cobra.sampling import OptGPSampler +import sys + +################################# process args ############################### +def process_args(args :List[str]) -> argparse.Namespace: + """ + Processes command-line arguments. + + Args: + args (list): List of command-line arguments. + + Returns: + Namespace: An object containing parsed arguments. + """ + parser = argparse.ArgumentParser(usage = '%(prog)s [options]', + description = 'process some value\'s') + + parser.add_argument('-ol', '--out_log', + help = "Output log") + + parser.add_argument('-td', '--tool_dir', + type = str, + required = True, + help = 'your tool directory') + + + + + + parser.add_argument('-in', '--input', + required = True, + type=str, + help = 'inputs model') + + parser.add_argument('-nm', '--name', + required = True, + type=str, + help = 'inputs model ids') + + parser.add_argument('-a', '--algorithm', + type = str, + choices = ['OPTGP', 'CBS'], + required = True, + help = 'choose sampling algorithm') + + parser.add_argument('-th', '--thinning', + type = int, + default= 100, + required=False, + help = 'choose thinning') + + parser.add_argument('-ns', '--n_samples', + type = int, + required = True, + help = 'choose how many samples') + + parser.add_argument('-sd', '--seed', + type = int, + required = True, + help = 'seed') + + parser.add_argument('-nb', '--n_batches', + type = int, + required = True, + help = 'choose how many batches') + + parser.add_argument('-ot', '--output_type', + type = str, + required = True, + help = 'output type') + + parser.add_argument('-ot', '--output_type_analysis', + type = str, + required = False, + help = 'output type analysis') + + ARGS = parser.parse_args() + return ARGS + +########################### warning ########################################### +def warning(s :str) -> None: + """ + Log a warning message to an output log file and print it to the console. + + Args: + s (str): The warning message to be logged and printed. + + Returns: + None + """ + with open(ARGS.out_log, 'a') as log: + log.write(s + "\n\n") + print(s) + + +def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: + dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = keep_index) + + + +def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: + + for i in range(0, n_batches): + optgp = OptGPSampler(model, thinning, seed) + samples = optgp.sample(n_samples) + samples.to_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv', index=False) + seed+=1 + samplesTotal = pd.DataFrame() + for i in range(0, n_batches): + samples_batch = pd.read_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv') + samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) + + write_to_file(samplesTotal, model_name) + + for i in range(0, n_batches): + os.remove(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv') + pass + + +def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: + + df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) + + df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) + + for i in range(0, n_batches): + samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) + try: + CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) + except Exception as e: + utils.logWarning( + "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e), + ARGS.out_log) + CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], + samples) + samples.to_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv', index=False) + + samplesTotal = pd.DataFrame() + for i in range(0, n_batches): + samples_batch = pd.read_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv') + samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) + + write_to_file(samplesTotal, model_name) + + for i in range(0, n_batches): + os.remove(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv') + pass + + +def model_sampler(model_input:str, model_name:str)-> List[pd.DataFrame]: + + model_type = utils.Model.Custom + model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(model_input), customExtension = utils.FilePath.fromStrPath(model_name).ext) + + utils.logWarning( + "Sampling model: " + model_name, + ARGS.out_log) + + name = model_name.split('.')[0] + + if ARGS.algorithm == 'OPTGP': + OPTGP_sampler(model, name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) + + elif ARGS.algorithm == 'CBS': + CBS_sampler(model, name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) + + df_mean, df_median, df_quantiles = fluxes_statistics(name, ARGS.output_types) + + if("fluxes" not in ARGS.output_types): + os.remove(ARGS.output_folder + name + '.csv') + + return df_mean, df_median, df_quantiles + +def fluxes_statistics(model_name: str, output_types:List)-> List[pd.DataFrame]: + + df_mean = pd.DataFrame() + df_median= pd.DataFrame() + df_quantiles= pd.DataFrame() + + df_samples = pd.read_csv(ARGS.output_folder + model_name + '.csv', sep = '\t') + for output_type in output_types: + if(output_type == "mean"): + df_mean = df_samples.mean() + df_mean = df_mean.to_frame().T + df_mean = df_mean.reset_index(drop=True) + df_mean.index = [model_name] + elif(output_type == "median"): + df_median = df_samples.median() + df_median = df_median.to_frame().T + df_median = df_median.reset_index(drop=True) + df_median.index = [model_name] + elif(output_type == "quantiles"): + df_quantile = df_samples.quantile([0.25, 0.5, 0.75]) + newRow = [] + cols = [] + for rxn in df_quantile.columns: + newRow.append(df_quantile[rxn].loc[0.25]) + cols.append(rxn + "_q1") + newRow.append(df_quantile[rxn].loc[0.5]) + cols.append(rxn + "_q2") + newRow.append(df_quantile[rxn].loc[0.75]) + cols.append(rxn + "_q3") + df_quantiles = pd.DataFrame(columns=cols) + df_quantiles.loc[0] = newRow + df_quantiles = df_quantiles.reset_index(drop=True) + df_quantiles.index = [model_name] + + return df_mean, df_median, df_quantiles + +############################# main ########################################### +def main() -> None: + """ + Initializes everything and sets the program in motion based on the fronted input arguments. + + Returns: + None + """ + if not os.path.exists('flux_sampling'): + os.makedirs('flux_sampling') + + num_processors = cpu_count() + + global ARGS + ARGS = process_args(sys.argv) + + ARGS.output_folder = 'flux_sampling/' + + utils.logWarning( + ARGS.output_type, + ARGS.out_log) + + models_input = ARGS.input.split(",") + models_name = ARGS.name.split(",") + ARGS.output_types = ARGS.output_type.split(",") + + + results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model_input, model_name) for model_input, model_name in zip(models_input, models_name)) + + all_mean = pd.concat([result[0] for result in results], ignore_index=False) + all_median = pd.concat([result[1] for result in results], ignore_index=False) + all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) + + if("mean" in ARGS.output_types): + all_mean = all_mean.fillna(0.0) + all_mean = all_mean.sort_index() + write_to_file(all_mean, "mean", True) + + if("median" in ARGS.output_types): + all_median = all_median.fillna(0.0) + all_median = all_median.sort_index() + write_to_file(all_median, "median", True) + + if("quantiles" in ARGS.output_types): + all_quantiles = all_quantiles.fillna(0.0) + all_quantiles = all_quantiles.sort_index() + write_to_file(all_quantiles, "quantiles", True) + pass + +############################################################################## +if __name__ == "__main__": + main() \ No newline at end of file
