Mercurial > repos > bimib > cobraxy
comparison COBRAxy/flux_simulation_beta.py @ 419:ed2c1f9e20ba draft
Uploaded
author | francesco_lapi |
---|---|
date | Tue, 09 Sep 2025 09:08:17 +0000 |
parents | 6b015d3184ab |
children | f9fe44c65772 |
comparison
equal
deleted
inserted
replaced
418:919b5b71a61c | 419:ed2c1f9e20ba |
---|---|
7 import cobra | 7 import cobra |
8 import utils.CBS_backend as CBS_backend | 8 import utils.CBS_backend as CBS_backend |
9 from joblib import Parallel, delayed, cpu_count | 9 from joblib import Parallel, delayed, cpu_count |
10 from cobra.sampling import OptGPSampler | 10 from cobra.sampling import OptGPSampler |
11 import sys | 11 import sys |
12 import utils.general_utils as utils | 12 import utils.model_utils as model_utils |
13 | 13 |
14 | 14 |
15 ################################# process args ############################### | 15 ################################# process args ############################### |
16 def process_args(args :List[str] = None) -> argparse.Namespace: | 16 def process_args(args :List[str] = None) -> argparse.Namespace: |
17 """ | 17 """ |
27 description = 'process some value\'s') | 27 description = 'process some value\'s') |
28 | 28 |
29 parser.add_argument("-mo", "--model_upload", type = str, | 29 parser.add_argument("-mo", "--model_upload", type = str, |
30 help = "path to input file with custom rules, if provided") | 30 help = "path to input file with custom rules, if provided") |
31 | 31 |
32 parser.add_argument("-mab", "--model_and_bounds", type = str, | |
33 choices = ['True', 'False'], | |
34 required = True, | |
35 help = "upload mode: True for model+bounds, False for complete models") | |
36 | |
37 | |
32 parser.add_argument('-ol', '--out_log', | 38 parser.add_argument('-ol', '--out_log', |
33 help = "Output log") | 39 help = "Output log") |
34 | 40 |
35 parser.add_argument('-td', '--tool_dir', | 41 parser.add_argument('-td', '--tool_dir', |
36 type = str, | 42 type = str, |
37 required = True, | 43 required = True, |
38 help = 'your tool directory') | 44 help = 'your tool directory') |
39 | 45 |
40 parser.add_argument('-in', '--input', | 46 parser.add_argument('-in', '--input', |
41 required = True, | 47 required = True, |
42 type=str, | 48 type=str, |
43 help = 'inputs bounds') | 49 help = 'input bounds files or complete model files') |
44 | 50 |
45 parser.add_argument('-ni', '--names', | 51 parser.add_argument('-ni', '--name', |
46 required = True, | 52 required = True, |
47 type=str, | 53 type=str, |
48 help = 'cell names') | 54 help = 'cell names') |
49 | 55 |
50 parser.add_argument('-a', '--algorithm', | 56 parser.add_argument('-a', '--algorithm', |
213 for i in range(0, n_batches): | 219 for i in range(0, n_batches): |
214 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') | 220 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') |
215 pass | 221 pass |
216 | 222 |
217 | 223 |
218 def model_sampler(model_input_original:cobra.Model, bounds_path:str, cell_name:str)-> List[pd.DataFrame]: | 224 |
219 """ | 225 def model_sampler_with_bounds(model_input_original: cobra.Model, bounds_path: str, cell_name: str) -> List[pd.DataFrame]: |
220 Prepares the model with bounds from the dataset and performs sampling and analysis based on the selected algorithm. | 226 """ |
227 MODE 1: Prepares the model with bounds from separate bounds file and performs sampling. | |
221 | 228 |
222 Args: | 229 Args: |
223 model_input_original (cobra.Model): The original COBRA model. | 230 model_input_original (cobra.Model): The original COBRA model. |
224 bounds_path (str): Path to the CSV file containing the bounds dataset. | 231 bounds_path (str): Path to the CSV file containing the bounds dataset. |
225 cell_name (str): Name of the cell, used to generate filenames for output. | 232 cell_name (str): Name of the cell, used to generate filenames for output. |
228 List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. | 235 List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. |
229 """ | 236 """ |
230 | 237 |
231 model_input = model_input_original.copy() | 238 model_input = model_input_original.copy() |
232 bounds_df = read_dataset(bounds_path, "bounds dataset") | 239 bounds_df = read_dataset(bounds_path, "bounds dataset") |
240 | |
241 # Apply bounds to model | |
233 for rxn_index, row in bounds_df.iterrows(): | 242 for rxn_index, row in bounds_df.iterrows(): |
234 model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound | 243 try: |
235 model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound | 244 model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound |
236 | 245 model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound |
246 except KeyError: | |
247 warning(f"Warning: Reaction {rxn_index} not found in model. Skipping.") | |
248 | |
249 return perform_sampling_and_analysis(model_input, cell_name) | |
250 | |
251 | |
252 def perform_sampling_and_analysis(model_input: cobra.Model, cell_name: str) -> List[pd.DataFrame]: | |
253 """ | |
254 Common function to perform sampling and analysis on a prepared model. | |
255 | |
256 Args: | |
257 model_input (cobra.Model): The prepared COBRA model with bounds applied. | |
258 cell_name (str): Name of the cell, used to generate filenames for output. | |
259 | |
260 Returns: | |
261 List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. | |
262 """ | |
237 | 263 |
238 if ARGS.algorithm == 'OPTGP': | 264 if ARGS.algorithm == 'OPTGP': |
239 OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) | 265 OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) |
240 | |
241 elif ARGS.algorithm == 'CBS': | 266 elif ARGS.algorithm == 'CBS': |
242 CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) | 267 CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) |
243 | 268 |
244 df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types) | 269 df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types) |
245 | 270 |
246 if("fluxes" not in ARGS.output_types): | 271 if("fluxes" not in ARGS.output_types): |
247 os.remove(ARGS.output_path + "/" + cell_name + '.csv') | 272 os.remove(ARGS.output_path + "/" + cell_name + '.csv') |
248 | 273 |
249 returnList = [] | 274 returnList = [df_mean, df_median, df_quantiles] |
250 returnList.append(df_mean) | |
251 returnList.append(df_median) | |
252 returnList.append(df_quantiles) | |
253 | 275 |
254 df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis) | 276 df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis) |
255 | 277 |
256 if("pFBA" in ARGS.output_type_analysis): | 278 if("pFBA" in ARGS.output_type_analysis): |
257 returnList.append(df_pFBA) | 279 returnList.append(df_pFBA) |
330 for output_type in output_types: | 352 for output_type in output_types: |
331 if(output_type == "pFBA"): | 353 if(output_type == "pFBA"): |
332 model.objective = "Biomass" | 354 model.objective = "Biomass" |
333 solution = cobra.flux_analysis.pfba(model) | 355 solution = cobra.flux_analysis.pfba(model) |
334 fluxes = solution.fluxes | 356 fluxes = solution.fluxes |
335 df_pFBA.loc[0,[rxn._id for rxn in model.reactions]] = fluxes.tolist() | 357 df_pFBA.loc[0,[rxn.id for rxn in model.reactions]] = fluxes.tolist() |
336 df_pFBA = df_pFBA.reset_index(drop=True) | 358 df_pFBA = df_pFBA.reset_index(drop=True) |
337 df_pFBA.index = [model_name] | 359 df_pFBA.index = [model_name] |
338 df_pFBA = df_pFBA.astype(float).round(6) | 360 df_pFBA = df_pFBA.astype(float).round(6) |
339 elif(output_type == "FVA"): | 361 elif(output_type == "FVA"): |
340 fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8) | 362 fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8) |
369 | 391 |
370 Returns: | 392 Returns: |
371 None | 393 None |
372 """ | 394 """ |
373 | 395 |
374 num_processors = cpu_count() | 396 num_processors = max(1, cpu_count() - 1) |
375 | 397 |
376 global ARGS | 398 global ARGS |
377 ARGS = process_args(args) | 399 ARGS = process_args(args) |
378 | 400 |
379 if not os.path.exists(ARGS.output_path): | 401 if not os.path.exists(ARGS.output_path): |
380 os.makedirs(ARGS.output_path) | 402 os.makedirs(ARGS.output_path) |
381 | 403 |
382 #model_type :utils.Model = ARGS.model_selector | 404 #ARGS.bounds = ARGS.input.split(",") |
383 #if model_type is utils.Model.Custom: | 405 #ARGS.bounds_name = ARGS.name.split(",") |
384 # model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) | 406 #ARGS.output_types = ARGS.output_type.split(",") |
385 #else: | 407 #ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") |
386 # model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) | 408 |
387 | 409 # --- Normalize inputs (the tool may pass comma-separated --input and either --name or --names) --- |
388 model = utils.build_cobra_model_from_csv(ARGS.model_upload) | 410 ARGS.input_files = ARGS.input.split(",") if getattr(ARGS, "input", None) else [] |
389 | 411 ARGS.file_names = ARGS.name.split(",") |
390 validation = utils.validate_model(model) | 412 # output types (required) -> list |
391 | 413 ARGS.output_types = ARGS.output_type.split(",") if getattr(ARGS, "output_type", None) else [] |
392 print("\n=== VALIDAZIONE MODELLO ===") | 414 # optional analysis output types -> list or empty |
393 for key, value in validation.items(): | 415 ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") if getattr(ARGS, "output_type_analysis", None) else [] |
394 print(f"{key}: {value}") | 416 |
395 | 417 |
396 #Set solver verbosity to 1 to see warning and error messages only. | 418 if ARGS.model_and_bounds == "True": |
397 model.solver.configuration.verbosity = 1 | 419 # MODE 1: Model + bounds (separate files) |
398 | 420 print("=== MODE 1: Model + Bounds (separate files) ===") |
399 ARGS.bounds = ARGS.input.split(",") | 421 |
400 ARGS.bounds_name = ARGS.names.split(",") | 422 # Load base model |
401 ARGS.output_types = ARGS.output_type.split(",") | 423 if not ARGS.model_upload: |
402 ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") | 424 sys.exit("Error: model_upload is required for Mode 1") |
403 | 425 |
404 | 426 base_model = model_utils.build_cobra_model_from_csv(ARGS.model_upload) |
405 results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model, bounds_path, cell_name) for bounds_path, cell_name in zip(ARGS.bounds, ARGS.bounds_name)) | 427 |
428 validation = model_utils.validate_model(base_model) | |
429 | |
430 print("\n=== VALIDAZIONE MODELLO ===") | |
431 for key, value in validation.items(): | |
432 print(f"{key}: {value}") | |
433 | |
434 #Set solver verbosity to 1 to see warning and error messages only. | |
435 base_model.solver.configuration.verbosity = 1 | |
436 | |
437 # Process each bounds file with the base model | |
438 results = Parallel(n_jobs=num_processors)( | |
439 delayed(model_sampler_with_bounds)(base_model, bounds_file, cell_name) | |
440 for bounds_file, cell_name in zip(ARGS.input_files, ARGS.file_names) | |
441 ) | |
442 | |
443 else: | |
444 # MODE 2: Multiple complete models | |
445 print("=== MODE 2: Multiple complete models ===") | |
446 | |
447 # Process each complete model file | |
448 results = Parallel(n_jobs=num_processors)( | |
449 delayed(perform_sampling_and_analysis)(model_utils.build_cobra_model_from_csv(model_file), cell_name) | |
450 for model_file, cell_name in zip(ARGS.input_files, ARGS.file_names) | |
451 ) | |
452 | |
406 | 453 |
407 all_mean = pd.concat([result[0] for result in results], ignore_index=False) | 454 all_mean = pd.concat([result[0] for result in results], ignore_index=False) |
408 all_median = pd.concat([result[1] for result in results], ignore_index=False) | 455 all_median = pd.concat([result[1] for result in results], ignore_index=False) |
409 all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) | 456 all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) |
410 | 457 |