| 
456
 | 
     1 """
 | 
| 
 | 
     2 Flux sampling and analysis utilities for COBRA models.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 This script supports two modes:
 | 
| 
 | 
     5 - Mode 1 (model_and_bounds=True): load a base model and apply bounds from
 | 
| 
 | 
     6     separate files before sampling.
 | 
| 
 | 
     7 - Mode 2 (model_and_bounds=False): load complete models and sample directly.
 | 
| 
 | 
     8 
 | 
| 
 | 
     9 Sampling algorithms supported: OPTGP and CBS. Outputs include flux samples
 | 
| 
 | 
    10 and optional analyses (pFBA, FVA, sensitivity), saved as tabular files.
 | 
| 
 | 
    11 """
 | 
| 
 | 
    12 
 | 
| 
410
 | 
    13 import argparse
 | 
| 
 | 
    14 import utils.general_utils as utils
 | 
| 
456
 | 
    15 from typing import List
 | 
| 
410
 | 
    16 import os
 | 
| 
 | 
    17 import pandas as pd
 | 
| 
 | 
    18 import cobra
 | 
| 
 | 
    19 import utils.CBS_backend as CBS_backend
 | 
| 
 | 
    20 from joblib import Parallel, delayed, cpu_count
 | 
| 
 | 
    21 from cobra.sampling import OptGPSampler
 | 
| 
 | 
    22 import sys
 | 
| 
419
 | 
    23 import utils.model_utils as model_utils
 | 
| 
410
 | 
    24 
 | 
| 
 | 
    25 
 | 
| 
 | 
    26 ################################# process args ###############################
 | 
| 
 | 
    27 def process_args(args :List[str] = None) -> argparse.Namespace:
 | 
| 
 | 
    28     """
 | 
| 
 | 
    29     Processes command-line arguments.
 | 
| 
 | 
    30 
 | 
| 
 | 
    31     Args:
 | 
| 
 | 
    32         args (list): List of command-line arguments.
 | 
| 
 | 
    33 
 | 
| 
 | 
    34     Returns:
 | 
| 
 | 
    35         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    36     """
 | 
| 
 | 
    37     parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
 | 
| 
 | 
    38                                      description = 'process some value\'s')
 | 
| 
 | 
    39     
 | 
| 
 | 
    40     parser.add_argument("-mo", "--model_upload", type = str,
 | 
| 
 | 
    41         help = "path to input file with custom rules, if provided")
 | 
| 
 | 
    42 
 | 
| 
419
 | 
    43     parser.add_argument("-mab", "--model_and_bounds", type = str,
 | 
| 
 | 
    44         choices = ['True', 'False'],
 | 
| 
 | 
    45         required = True,
 | 
| 
 | 
    46         help = "upload mode: True for model+bounds, False for complete models")
 | 
| 
 | 
    47 
 | 
| 
 | 
    48 
 | 
| 
410
 | 
    49     parser.add_argument('-ol', '--out_log', 
 | 
| 
 | 
    50                         help = "Output log")
 | 
| 
 | 
    51     
 | 
| 
 | 
    52     parser.add_argument('-td', '--tool_dir',
 | 
| 
 | 
    53                         type = str,
 | 
| 
 | 
    54                         required = True,
 | 
| 
 | 
    55                         help = 'your tool directory')
 | 
| 
 | 
    56     
 | 
| 
 | 
    57     parser.add_argument('-in', '--input',
 | 
| 
419
 | 
    58                     required = True,
 | 
| 
 | 
    59                     type=str,
 | 
| 
 | 
    60                     help = 'input bounds files or complete model files')
 | 
| 
410
 | 
    61     
 | 
| 
419
 | 
    62     parser.add_argument('-ni', '--name',
 | 
| 
410
 | 
    63                         required = True,
 | 
| 
 | 
    64                         type=str,
 | 
| 
 | 
    65                         help = 'cell names')
 | 
| 
 | 
    66     
 | 
| 
 | 
    67     parser.add_argument('-a', '--algorithm',
 | 
| 
 | 
    68                         type = str,
 | 
| 
 | 
    69                         choices = ['OPTGP', 'CBS'],
 | 
| 
 | 
    70                         required = True,
 | 
| 
 | 
    71                         help = 'choose sampling algorithm')
 | 
| 
 | 
    72     
 | 
| 
 | 
    73     parser.add_argument('-th', '--thinning', 
 | 
| 
 | 
    74                         type = int,
 | 
| 
 | 
    75                         default= 100,
 | 
| 
 | 
    76                         required=False,
 | 
| 
 | 
    77                         help = 'choose thinning')
 | 
| 
 | 
    78     
 | 
| 
 | 
    79     parser.add_argument('-ns', '--n_samples', 
 | 
| 
 | 
    80                         type = int,
 | 
| 
 | 
    81                         required = True,
 | 
| 
 | 
    82                         help = 'choose how many samples')
 | 
| 
 | 
    83     
 | 
| 
 | 
    84     parser.add_argument('-sd', '--seed', 
 | 
| 
 | 
    85                         type = int,
 | 
| 
 | 
    86                         required = True,
 | 
| 
 | 
    87                         help = 'seed')
 | 
| 
 | 
    88     
 | 
| 
 | 
    89     parser.add_argument('-nb', '--n_batches', 
 | 
| 
 | 
    90                         type = int,
 | 
| 
 | 
    91                         required = True,
 | 
| 
 | 
    92                         help = 'choose how many batches')
 | 
| 
 | 
    93     
 | 
| 
430
 | 
    94     parser.add_argument('-opt', '--perc_opt',
 | 
| 
 | 
    95                         type = float,
 | 
| 
 | 
    96                         default=0.9,
 | 
| 
 | 
    97                         required = False,
 | 
| 
 | 
    98                         help = 'choose the fraction of optimality for FVA (0-1)')
 | 
| 
 | 
    99     
 | 
| 
410
 | 
   100     parser.add_argument('-ot', '--output_type', 
 | 
| 
 | 
   101                         type = str,
 | 
| 
 | 
   102                         required = True,
 | 
| 
 | 
   103                         help = 'output type')
 | 
| 
 | 
   104     
 | 
| 
 | 
   105     parser.add_argument('-ota', '--output_type_analysis', 
 | 
| 
 | 
   106                         type = str,
 | 
| 
 | 
   107                         required = False,
 | 
| 
 | 
   108                         help = 'output type analysis')
 | 
| 
 | 
   109     
 | 
| 
 | 
   110     parser.add_argument('-idop', '--output_path', 
 | 
| 
 | 
   111                         type = str,
 | 
| 
 | 
   112                         default='flux_simulation',
 | 
| 
 | 
   113                         help = 'output path for maps')
 | 
| 
 | 
   114     
 | 
| 
 | 
   115     ARGS = parser.parse_args(args)
 | 
| 
 | 
   116     return ARGS
 | 
| 
 | 
   117 
 | 
| 
 | 
   118 ########################### warning ###########################################
 | 
| 
 | 
   119 def warning(s :str) -> None:
 | 
| 
 | 
   120     """
 | 
| 
 | 
   121     Log a warning message to an output log file and print it to the console.
 | 
| 
 | 
   122 
 | 
| 
 | 
   123     Args:
 | 
| 
 | 
   124         s (str): The warning message to be logged and printed.
 | 
| 
 | 
   125     
 | 
| 
 | 
   126     Returns:
 | 
| 
 | 
   127       None
 | 
| 
 | 
   128     """
 | 
| 
 | 
   129     with open(ARGS.out_log, 'a') as log:
 | 
| 
 | 
   130         log.write(s + "\n\n")
 | 
| 
 | 
   131     print(s)
 | 
| 
 | 
   132 
 | 
| 
 | 
   133 
 | 
| 
 | 
   134 def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None:
 | 
| 
456
 | 
   135     """
 | 
| 
 | 
   136     Write a DataFrame to a TSV file under ARGS.output_path with a given base name.
 | 
| 
 | 
   137 
 | 
| 
 | 
   138     Args:
 | 
| 
 | 
   139         dataset: The DataFrame to write.
 | 
| 
 | 
   140         name: Base file name (without extension).
 | 
| 
 | 
   141         keep_index: Whether to keep the DataFrame index in the file.
 | 
| 
 | 
   142 
 | 
| 
 | 
   143     Returns:
 | 
| 
 | 
   144         None
 | 
| 
 | 
   145     """
 | 
| 
410
 | 
   146     dataset.index.name = 'Reactions'
 | 
| 
 | 
   147     dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index)
 | 
| 
 | 
   148 
 | 
| 
 | 
   149 ############################ dataset input ####################################
 | 
| 
 | 
   150 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
   151     """
 | 
| 
 | 
   152     Read a dataset from a CSV file and return it as a pandas DataFrame.
 | 
| 
 | 
   153 
 | 
| 
 | 
   154     Args:
 | 
| 
 | 
   155         data (str): Path to the CSV file containing the dataset.
 | 
| 
 | 
   156         name (str): Name of the dataset, used in error messages.
 | 
| 
 | 
   157 
 | 
| 
 | 
   158     Returns:
 | 
| 
 | 
   159         pandas.DataFrame: DataFrame containing the dataset.
 | 
| 
 | 
   160 
 | 
| 
 | 
   161     Raises:
 | 
| 
 | 
   162         pd.errors.EmptyDataError: If the CSV file is empty.
 | 
| 
 | 
   163         sys.exit: If the CSV file has the wrong format, the execution is aborted.
 | 
| 
 | 
   164     """
 | 
| 
 | 
   165     try:
 | 
| 
 | 
   166         dataset = pd.read_csv(data, sep = '\t', header = 0, index_col=0, engine='python')
 | 
| 
 | 
   167     except pd.errors.EmptyDataError:
 | 
| 
 | 
   168         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
   169     if len(dataset.columns) < 2:
 | 
| 
 | 
   170         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
   171     return dataset
 | 
| 
 | 
   172 
 | 
| 
 | 
   173 
 | 
| 
 | 
   174 
 | 
| 
 | 
   175 def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:
 | 
| 
 | 
   176     """
 | 
| 
 | 
   177     Samples from the OPTGP (Optimal Global Perturbation) algorithm and saves the results to CSV files.
 | 
| 
 | 
   178 
 | 
| 
 | 
   179     Args:
 | 
| 
 | 
   180         model (cobra.Model): The COBRA model to sample from.
 | 
| 
 | 
   181         model_name (str): The name of the model, used in naming output files.
 | 
| 
 | 
   182         n_samples (int, optional): Number of samples per batch. Default is 1000.
 | 
| 
 | 
   183         thinning (int, optional): Thinning parameter for the sampler. Default is 100.
 | 
| 
 | 
   184         n_batches (int, optional): Number of batches to run. Default is 1.
 | 
| 
 | 
   185         seed (int, optional): Random seed for reproducibility. Default is 0.
 | 
| 
 | 
   186     
 | 
| 
 | 
   187     Returns:
 | 
| 
 | 
   188         None
 | 
| 
 | 
   189     """
 | 
| 
 | 
   190 
 | 
| 
 | 
   191     for i in range(0, n_batches):
 | 
| 
 | 
   192         optgp = OptGPSampler(model, thinning, seed)
 | 
| 
 | 
   193         samples = optgp.sample(n_samples)
 | 
| 
 | 
   194         samples.to_csv(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_OPTGP.csv', index=False)
 | 
| 
 | 
   195         seed+=1
 | 
| 
 | 
   196     samplesTotal = pd.DataFrame()
 | 
| 
 | 
   197     for i in range(0, n_batches):
 | 
| 
 | 
   198         samples_batch = pd.read_csv(ARGS.output_path + "/"  +  model_name + '_'+ str(i)+'_OPTGP.csv')
 | 
| 
 | 
   199         samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
 | 
| 
 | 
   200 
 | 
| 
 | 
   201     write_to_file(samplesTotal.T, model_name, True)
 | 
| 
 | 
   202 
 | 
| 
 | 
   203     for i in range(0, n_batches):
 | 
| 
 | 
   204         os.remove(ARGS.output_path + "/" +   model_name + '_'+ str(i)+'_OPTGP.csv')
 | 
| 
 | 
   205 
 | 
| 
 | 
   206 
 | 
| 
 | 
   207 def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:
 | 
| 
 | 
   208     """
 | 
| 
 | 
   209     Samples using the CBS (Constraint-based Sampling) algorithm and saves the results to CSV files.
 | 
| 
 | 
   210 
 | 
| 
 | 
   211     Args:
 | 
| 
 | 
   212         model (cobra.Model): The COBRA model to sample from.
 | 
| 
 | 
   213         model_name (str): The name of the model, used in naming output files.
 | 
| 
 | 
   214         n_samples (int, optional): Number of samples per batch. Default is 1000.
 | 
| 
 | 
   215         n_batches (int, optional): Number of batches to run. Default is 1.
 | 
| 
 | 
   216         seed (int, optional): Random seed for reproducibility. Default is 0.
 | 
| 
 | 
   217     
 | 
| 
 | 
   218     Returns:
 | 
| 
 | 
   219         None
 | 
| 
 | 
   220     """
 | 
| 
 | 
   221 
 | 
| 
 | 
   222     df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
 | 
| 
 | 
   223     
 | 
| 
 | 
   224     df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)
 | 
| 
 | 
   225 
 | 
| 
 | 
   226     for i in range(0, n_batches):
 | 
| 
 | 
   227         samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
 | 
| 
 | 
   228         try:
 | 
| 
 | 
   229             CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
 | 
| 
 | 
   230         except Exception as e:
 | 
| 
 | 
   231             utils.logWarning(
 | 
| 
 | 
   232             "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e),
 | 
| 
 | 
   233             ARGS.out_log)
 | 
| 
 | 
   234             CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
 | 
| 
 | 
   235                                                     samples)
 | 
| 
 | 
   236         utils.logWarning(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log)
 | 
| 
 | 
   237         samples.to_csv(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_CBS.csv', index=False)
 | 
| 
 | 
   238 
 | 
| 
 | 
   239     samplesTotal = pd.DataFrame()
 | 
| 
 | 
   240     for i in range(0, n_batches):
 | 
| 
 | 
   241         samples_batch = pd.read_csv(ARGS.output_path + "/"  +  model_name + '_'+ str(i)+'_CBS.csv')
 | 
| 
 | 
   242         samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
 | 
| 
 | 
   243 
 | 
| 
 | 
   244     write_to_file(samplesTotal.T, model_name, True)
 | 
| 
 | 
   245 
 | 
| 
 | 
   246     for i in range(0, n_batches):
 | 
| 
 | 
   247         os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv')
 | 
| 
 | 
   248 
 | 
| 
 | 
   249 
 | 
| 
419
 | 
   250 
 | 
| 
 | 
   251 def model_sampler_with_bounds(model_input_original: cobra.Model, bounds_path: str, cell_name: str) -> List[pd.DataFrame]:
 | 
| 
410
 | 
   252     """
 | 
| 
419
 | 
   253     MODE 1: Prepares the model with bounds from separate bounds file and performs sampling.
 | 
| 
410
 | 
   254 
 | 
| 
 | 
   255     Args:
 | 
| 
 | 
   256         model_input_original (cobra.Model): The original COBRA model.
 | 
| 
 | 
   257         bounds_path (str): Path to the CSV file containing the bounds dataset.
 | 
| 
 | 
   258         cell_name (str): Name of the cell, used to generate filenames for output.
 | 
| 
 | 
   259 
 | 
| 
 | 
   260     Returns:
 | 
| 
 | 
   261         List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results.
 | 
| 
 | 
   262     """
 | 
| 
 | 
   263 
 | 
| 
 | 
   264     model_input = model_input_original.copy()
 | 
| 
 | 
   265     bounds_df = read_dataset(bounds_path, "bounds dataset")
 | 
| 
419
 | 
   266     
 | 
| 
 | 
   267     # Apply bounds to model
 | 
| 
410
 | 
   268     for rxn_index, row in bounds_df.iterrows():
 | 
| 
419
 | 
   269         try:
 | 
| 
 | 
   270             model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound
 | 
| 
 | 
   271             model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound
 | 
| 
 | 
   272         except KeyError:
 | 
| 
 | 
   273             warning(f"Warning: Reaction {rxn_index} not found in model. Skipping.")
 | 
| 
410
 | 
   274     
 | 
| 
419
 | 
   275     return perform_sampling_and_analysis(model_input, cell_name)
 | 
| 
 | 
   276 
 | 
| 
 | 
   277 
 | 
| 
 | 
   278 def perform_sampling_and_analysis(model_input: cobra.Model, cell_name: str) -> List[pd.DataFrame]:
 | 
| 
 | 
   279     """
 | 
| 
 | 
   280     Common function to perform sampling and analysis on a prepared model.
 | 
| 
 | 
   281 
 | 
| 
 | 
   282     Args:
 | 
| 
 | 
   283         model_input (cobra.Model): The prepared COBRA model with bounds applied.
 | 
| 
 | 
   284         cell_name (str): Name of the cell, used to generate filenames for output.
 | 
| 
 | 
   285 
 | 
| 
 | 
   286     Returns:
 | 
| 
 | 
   287         List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results.
 | 
| 
 | 
   288     """
 | 
| 
410
 | 
   289     
 | 
| 
 | 
   290     if ARGS.algorithm == 'OPTGP':
 | 
| 
 | 
   291         OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed)
 | 
| 
 | 
   292     elif ARGS.algorithm == 'CBS':
 | 
| 
419
 | 
   293         CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed)
 | 
| 
410
 | 
   294 
 | 
| 
 | 
   295     df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types)
 | 
| 
 | 
   296 
 | 
| 
 | 
   297     if("fluxes" not in ARGS.output_types):
 | 
| 
419
 | 
   298         os.remove(ARGS.output_path + "/" + cell_name + '.csv')
 | 
| 
410
 | 
   299 
 | 
| 
419
 | 
   300     returnList = [df_mean, df_median, df_quantiles]
 | 
| 
410
 | 
   301 
 | 
| 
 | 
   302     df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis)
 | 
| 
 | 
   303 
 | 
| 
 | 
   304     if("pFBA" in ARGS.output_type_analysis):
 | 
| 
 | 
   305         returnList.append(df_pFBA)
 | 
| 
 | 
   306     if("FVA" in ARGS.output_type_analysis):
 | 
| 
 | 
   307         returnList.append(df_FVA)
 | 
| 
 | 
   308     if("sensitivity" in ARGS.output_type_analysis):
 | 
| 
 | 
   309         returnList.append(df_sensitivity)
 | 
| 
 | 
   310 
 | 
| 
 | 
   311     return returnList
 | 
| 
 | 
   312 
 | 
| 
 | 
   313 def fluxes_statistics(model_name: str,  output_types:List)-> List[pd.DataFrame]:
 | 
| 
 | 
   314     """
 | 
| 
 | 
   315     Computes statistics (mean, median, quantiles) for the fluxes.
 | 
| 
 | 
   316 
 | 
| 
 | 
   317     Args:
 | 
| 
 | 
   318         model_name (str): Name of the model, used in filename for input.
 | 
| 
 | 
   319         output_types (List[str]): Types of statistics to compute (mean, median, quantiles).
 | 
| 
 | 
   320 
 | 
| 
 | 
   321     Returns:
 | 
| 
 | 
   322         List[pd.DataFrame]: List of DataFrames containing mean, median, and quantiles statistics.
 | 
| 
 | 
   323     """
 | 
| 
 | 
   324 
 | 
| 
 | 
   325     df_mean = pd.DataFrame()
 | 
| 
 | 
   326     df_median= pd.DataFrame()
 | 
| 
 | 
   327     df_quantiles= pd.DataFrame()
 | 
| 
 | 
   328 
 | 
| 
 | 
   329     df_samples = pd.read_csv(ARGS.output_path + "/"  +  model_name + '.csv', sep = '\t', index_col = 0).T
 | 
| 
 | 
   330     df_samples = df_samples.round(8)
 | 
| 
 | 
   331 
 | 
| 
 | 
   332     for output_type in output_types:
 | 
| 
 | 
   333         if(output_type == "mean"):
 | 
| 
 | 
   334             df_mean = df_samples.mean()
 | 
| 
 | 
   335             df_mean = df_mean.to_frame().T
 | 
| 
 | 
   336             df_mean = df_mean.reset_index(drop=True)
 | 
| 
 | 
   337             df_mean.index = [model_name]
 | 
| 
 | 
   338         elif(output_type == "median"):
 | 
| 
 | 
   339             df_median = df_samples.median()
 | 
| 
 | 
   340             df_median = df_median.to_frame().T
 | 
| 
 | 
   341             df_median = df_median.reset_index(drop=True)
 | 
| 
 | 
   342             df_median.index = [model_name]
 | 
| 
 | 
   343         elif(output_type == "quantiles"):
 | 
| 
 | 
   344             newRow = []
 | 
| 
 | 
   345             cols = []
 | 
| 
 | 
   346             for rxn in df_samples.columns:
 | 
| 
 | 
   347                 quantiles = df_samples[rxn].quantile([0.25, 0.50, 0.75])
 | 
| 
 | 
   348                 newRow.append(quantiles[0.25])
 | 
| 
 | 
   349                 cols.append(rxn + "_q1")
 | 
| 
 | 
   350                 newRow.append(quantiles[0.5])
 | 
| 
 | 
   351                 cols.append(rxn + "_q2")
 | 
| 
 | 
   352                 newRow.append(quantiles[0.75])
 | 
| 
 | 
   353                 cols.append(rxn + "_q3")
 | 
| 
 | 
   354             df_quantiles = pd.DataFrame(columns=cols)
 | 
| 
 | 
   355             df_quantiles.loc[0] = newRow
 | 
| 
 | 
   356             df_quantiles = df_quantiles.reset_index(drop=True)
 | 
| 
 | 
   357             df_quantiles.index = [model_name]
 | 
| 
 | 
   358     
 | 
| 
 | 
   359     return df_mean, df_median, df_quantiles
 | 
| 
 | 
   360 
 | 
| 
 | 
   361 def fluxes_analysis(model:cobra.Model,  model_name:str, output_types:List)-> List[pd.DataFrame]:
 | 
| 
 | 
   362     """
 | 
| 
 | 
   363     Performs flux analysis including pFBA, FVA, and sensitivity analysis.
 | 
| 
 | 
   364 
 | 
| 
 | 
   365     Args:
 | 
| 
 | 
   366         model (cobra.Model): The COBRA model to analyze.
 | 
| 
 | 
   367         model_name (str): Name of the model, used in filenames for output.
 | 
| 
 | 
   368         output_types (List[str]): Types of analysis to perform (pFBA, FVA, sensitivity).
 | 
| 
 | 
   369 
 | 
| 
 | 
   370     Returns:
 | 
| 
 | 
   371         List[pd.DataFrame]: List of DataFrames containing pFBA, FVA, and sensitivity analysis results.
 | 
| 
 | 
   372     """
 | 
| 
 | 
   373 
 | 
| 
 | 
   374     df_pFBA = pd.DataFrame()
 | 
| 
 | 
   375     df_FVA= pd.DataFrame()
 | 
| 
 | 
   376     df_sensitivity= pd.DataFrame()
 | 
| 
 | 
   377 
 | 
| 
 | 
   378     for output_type in output_types:
 | 
| 
 | 
   379         if(output_type == "pFBA"):
 | 
| 
 | 
   380             model.objective = "Biomass"
 | 
| 
 | 
   381             solution = cobra.flux_analysis.pfba(model)
 | 
| 
 | 
   382             fluxes = solution.fluxes
 | 
| 
419
 | 
   383             df_pFBA.loc[0,[rxn.id for rxn in model.reactions]] = fluxes.tolist()
 | 
| 
410
 | 
   384             df_pFBA = df_pFBA.reset_index(drop=True)
 | 
| 
 | 
   385             df_pFBA.index = [model_name]
 | 
| 
 | 
   386             df_pFBA = df_pFBA.astype(float).round(6)
 | 
| 
 | 
   387         elif(output_type == "FVA"):
 | 
| 
430
 | 
   388             fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=ARGS.perc_opt, processes=1).round(8)
 | 
| 
410
 | 
   389             columns = []
 | 
| 
 | 
   390             for rxn in fva.index.to_list():
 | 
| 
 | 
   391                 columns.append(rxn + "_min")
 | 
| 
 | 
   392                 columns.append(rxn + "_max")
 | 
| 
 | 
   393             df_FVA= pd.DataFrame(columns = columns)
 | 
| 
 | 
   394             for index_rxn, row in fva.iterrows():
 | 
| 
 | 
   395                 df_FVA.loc[0, index_rxn+ "_min"] = fva.loc[index_rxn, "minimum"]
 | 
| 
 | 
   396                 df_FVA.loc[0, index_rxn+ "_max"] = fva.loc[index_rxn, "maximum"]
 | 
| 
 | 
   397             df_FVA = df_FVA.reset_index(drop=True)
 | 
| 
 | 
   398             df_FVA.index = [model_name]
 | 
| 
 | 
   399             df_FVA = df_FVA.astype(float).round(6)
 | 
| 
 | 
   400         elif(output_type == "sensitivity"):
 | 
| 
 | 
   401             model.objective = "Biomass"
 | 
| 
 | 
   402             solution_original = model.optimize().objective_value
 | 
| 
 | 
   403             reactions = model.reactions
 | 
| 
 | 
   404             single = cobra.flux_analysis.single_reaction_deletion(model)
 | 
| 
 | 
   405             newRow = []
 | 
| 
 | 
   406             df_sensitivity = pd.DataFrame(columns = [rxn.id for rxn in reactions], index = [model_name])
 | 
| 
 | 
   407             for rxn in reactions:
 | 
| 
 | 
   408                 newRow.append(single.knockout[rxn.id].growth.values[0]/solution_original)
 | 
| 
 | 
   409             df_sensitivity.loc[model_name] = newRow
 | 
| 
 | 
   410             df_sensitivity = df_sensitivity.astype(float).round(6)
 | 
| 
 | 
   411     return df_pFBA, df_FVA, df_sensitivity
 | 
| 
 | 
   412 
 | 
| 
 | 
   413 ############################# main ###########################################
 | 
| 
 | 
   414 def main(args :List[str] = None) -> None:
 | 
| 
 | 
   415     """
 | 
| 
456
 | 
   416     Initialize and run sampling/analysis based on the frontend input arguments.
 | 
| 
410
 | 
   417 
 | 
| 
 | 
   418     Returns:
 | 
| 
 | 
   419         None
 | 
| 
 | 
   420     """
 | 
| 
 | 
   421 
 | 
| 
419
 | 
   422     num_processors = max(1, cpu_count() - 1)
 | 
| 
410
 | 
   423 
 | 
| 
 | 
   424     global ARGS
 | 
| 
 | 
   425     ARGS = process_args(args)
 | 
| 
 | 
   426 
 | 
| 
 | 
   427     if not os.path.exists(ARGS.output_path):
 | 
| 
 | 
   428         os.makedirs(ARGS.output_path)
 | 
| 
419
 | 
   429 
 | 
| 
 | 
   430     # --- Normalize inputs (the tool may pass comma-separated --input and either --name or --names) ---
 | 
| 
421
 | 
   431     ARGS.input_files = ARGS.input.split(",") if ARGS.input else []
 | 
| 
419
 | 
   432     ARGS.file_names = ARGS.name.split(",")
 | 
| 
 | 
   433     # output types (required) -> list
 | 
| 
421
 | 
   434     ARGS.output_types = ARGS.output_type.split(",") if ARGS.output_type else []
 | 
| 
419
 | 
   435     # optional analysis output types -> list or empty
 | 
| 
421
 | 
   436     ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") if ARGS.output_type_analysis else []
 | 
| 
419
 | 
   437 
 | 
| 
421
 | 
   438     print("=== INPUT FILES ===")
 | 
| 
422
 | 
   439     print(f"{ARGS.input_files}")
 | 
| 
 | 
   440     print(f"{ARGS.file_names}")
 | 
| 
 | 
   441     print(f"{ARGS.output_type}")
 | 
| 
 | 
   442     print(f"{ARGS.output_types}")
 | 
| 
 | 
   443     print(f"{ARGS.output_type_analysis}")
 | 
| 
410
 | 
   444     
 | 
| 
419
 | 
   445     if ARGS.model_and_bounds == "True":
 | 
| 
 | 
   446         # MODE 1: Model + bounds (separate files)
 | 
| 
 | 
   447         print("=== MODE 1: Model + Bounds (separate files) ===")
 | 
| 
 | 
   448         
 | 
| 
 | 
   449         # Load base model
 | 
| 
 | 
   450         if not ARGS.model_upload:
 | 
| 
 | 
   451             sys.exit("Error: model_upload is required for Mode 1")
 | 
| 
410
 | 
   452 
 | 
| 
419
 | 
   453         base_model = model_utils.build_cobra_model_from_csv(ARGS.model_upload)
 | 
| 
410
 | 
   454 
 | 
| 
419
 | 
   455         validation = model_utils.validate_model(base_model)
 | 
| 
 | 
   456 
 | 
| 
456
 | 
   457         print("\n=== MODEL VALIDATION ===")
 | 
| 
419
 | 
   458         for key, value in validation.items():
 | 
| 
 | 
   459             print(f"{key}: {value}")
 | 
| 
 | 
   460 
 | 
| 
456
 | 
   461         # Set solver verbosity to 1 to see warning and error messages only.
 | 
| 
419
 | 
   462         base_model.solver.configuration.verbosity = 1
 | 
| 
410
 | 
   463 
 | 
| 
456
 | 
   464         # Process each bounds file with the base model
 | 
| 
419
 | 
   465         results = Parallel(n_jobs=num_processors)(
 | 
| 
 | 
   466             delayed(model_sampler_with_bounds)(base_model, bounds_file, cell_name) 
 | 
| 
 | 
   467             for bounds_file, cell_name in zip(ARGS.input_files, ARGS.file_names)
 | 
| 
 | 
   468         )
 | 
| 
410
 | 
   469 
 | 
| 
419
 | 
   470     else:
 | 
| 
 | 
   471         # MODE 2: Multiple complete models
 | 
| 
 | 
   472         print("=== MODE 2: Multiple complete models ===")
 | 
| 
 | 
   473         
 | 
| 
 | 
   474         # Process each complete model file
 | 
| 
 | 
   475         results = Parallel(n_jobs=num_processors)(
 | 
| 
 | 
   476             delayed(perform_sampling_and_analysis)(model_utils.build_cobra_model_from_csv(model_file), cell_name) 
 | 
| 
 | 
   477             for model_file, cell_name in zip(ARGS.input_files, ARGS.file_names)
 | 
| 
 | 
   478         )
 | 
| 
410
 | 
   479 
 | 
| 
 | 
   480 
 | 
| 
 | 
   481     all_mean = pd.concat([result[0] for result in results], ignore_index=False)
 | 
| 
 | 
   482     all_median = pd.concat([result[1] for result in results], ignore_index=False)
 | 
| 
 | 
   483     all_quantiles = pd.concat([result[2] for result in results], ignore_index=False)
 | 
| 
 | 
   484 
 | 
| 
 | 
   485     if("mean" in ARGS.output_types):
 | 
| 
 | 
   486         all_mean = all_mean.fillna(0.0)
 | 
| 
 | 
   487         all_mean = all_mean.sort_index()
 | 
| 
 | 
   488         write_to_file(all_mean.T, "mean", True)
 | 
| 
 | 
   489 
 | 
| 
 | 
   490     if("median" in ARGS.output_types):
 | 
| 
 | 
   491         all_median = all_median.fillna(0.0)
 | 
| 
 | 
   492         all_median = all_median.sort_index()
 | 
| 
 | 
   493         write_to_file(all_median.T, "median", True)
 | 
| 
 | 
   494     
 | 
| 
 | 
   495     if("quantiles" in ARGS.output_types):
 | 
| 
 | 
   496         all_quantiles = all_quantiles.fillna(0.0)
 | 
| 
 | 
   497         all_quantiles = all_quantiles.sort_index()
 | 
| 
 | 
   498         write_to_file(all_quantiles.T, "quantiles", True)
 | 
| 
 | 
   499 
 | 
| 
 | 
   500     index_result = 3
 | 
| 
 | 
   501     if("pFBA" in ARGS.output_type_analysis):
 | 
| 
 | 
   502         all_pFBA = pd.concat([result[index_result] for result in results], ignore_index=False)
 | 
| 
 | 
   503         all_pFBA = all_pFBA.sort_index()
 | 
| 
 | 
   504         write_to_file(all_pFBA.T, "pFBA", True)
 | 
| 
 | 
   505         index_result+=1
 | 
| 
 | 
   506     if("FVA" in ARGS.output_type_analysis):
 | 
| 
 | 
   507         all_FVA= pd.concat([result[index_result] for result in results], ignore_index=False)
 | 
| 
 | 
   508         all_FVA = all_FVA.sort_index()
 | 
| 
 | 
   509         write_to_file(all_FVA.T, "FVA", True)
 | 
| 
 | 
   510         index_result+=1
 | 
| 
 | 
   511     if("sensitivity" in ARGS.output_type_analysis):
 | 
| 
 | 
   512         all_sensitivity = pd.concat([result[index_result] for result in results], ignore_index=False)
 | 
| 
 | 
   513         all_sensitivity = all_sensitivity.sort_index()
 | 
| 
 | 
   514         write_to_file(all_sensitivity.T, "sensitivity", True)
 | 
| 
 | 
   515 
 | 
| 
456
 | 
   516     return
 | 
| 
410
 | 
   517         
 | 
| 
 | 
   518 ##############################################################################
 | 
| 
 | 
   519 if __name__ == "__main__":
 | 
| 
 | 
   520     main() |