Mercurial > repos > bimib > marea_2
view marea_2/ras_to_bounds.py @ 107:d9b29c64efc9 draft
Uploaded
author | luca_milaz |
---|---|
date | Sun, 21 Jul 2024 18:47:53 +0000 |
parents | 10f8540eeec5 |
children | b5b7e46be22a |
line wrap: on
line source
import argparse import utils.general_utils as utils from typing import Optional, List import os import numpy as np import pandas as pd import cobra from joblib import Parallel, delayed, cpu_count import sys ################################# process args ############################### def process_args(args :List[str]) -> argparse.Namespace: """ Processes command-line arguments. Args: args (list): List of command-line arguments. Returns: Namespace: An object containing parsed arguments. """ parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s') parser.add_argument( '-ms', '--model_selector', type = utils.Model, default = utils.Model.ENGRO2, choices = [utils.Model.ENGRO2, utils.Model.Custom], help = 'chose which type of model you want use') parser.add_argument("-mo", "--model", type = str, help = "path to input file with custom rules, if provided") parser.add_argument("-mn", "--model_name", type = str, help = "custom mode name") parser.add_argument( '-mes', '--medium_selector', default = "allOpen", help = 'chose which type of medium you want use') parser.add_argument("-meo", "--medium", type = str, help = "path to input file with custom medium, if provided") parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument('-ir', '--input_ras', required = False, help = 'input ras') parser.add_argument('-rs', '--ras_selector', required = True, type=bool, help = 'ras selector') ARGS = parser.parse_args() return ARGS ########################### warning ########################################### def warning(s :str) -> None: """ Log a warning message to an output log file and print it to the console. Args: s (str): The warning message to be logged and printed. Returns: None """ with open(ARGS.out_log, 'a') as log: log.write(s + "\n\n") print(s) def generate_bounds(model:cobra.Model, medium:dict, ras=None) -> pd.DataFrame: model_new = model.copy() rxns_ids = [] for rxn in model.reactions: rxns_ids.append(rxn.id) for reaction in medium.keys(): if(medium[reaction] != None): model_new.reactions.get_by_id(reaction).lower_bound=-float(medium[reaction]) df_FVA = cobra.flux_analysis.flux_variability_analysis(model_new,fraction_of_optimum=0,processes=1).round(8) for reaction in rxns_ids: model_new.reactions.get_by_id(reaction).lower_bound=float(df_FVA.loc[reaction,"minimum"]) model_new.reactions.get_by_id(reaction).upper_bound=float(df_FVA.loc[reaction,"maximum"]) if(ras is not None): for reaction in rxns_ids: if reaction in ras.keys(): lower_bound=model_new.reactions.get_by_id(reaction).lower_bound upper_bound=model_new.reactions.get_by_id(reaction).upper_bound valMax=float((upper_bound)*ras[reaction]) valMin=float((lower_bound)*ras[reaction]) if upper_bound!=0 and lower_bound==0: model_new.reactions.get_by_id(reaction).upper_bound=valMax if upper_bound==0 and lower_bound!=0: model_new.reactions.get_by_id(reaction).lower_bound=valMin if upper_bound!=0 and lower_bound!=0: model_new.reactions.get_by_id(reaction).lower_bound=valMin model_new.reactions.get_by_id(reaction).upper_bound=valMax rxns = [] for reaction in model_new.reactions: rxns.append(reaction.id) bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) for reaction in model.reactions: bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] return bounds ############################# main ########################################### def main() -> None: """ Initializes everything and sets the program in motion based on the fronted input arguments. Returns: None """ if not os.path.exists('ras_to_bounds'): os.makedirs('ras_to_bounds') ARGS.output_folder = 'ras_to_bounds/' global ARGS ARGS = process_args(sys.argv) ARGS.output_types = ARGS.output_type.split(",") boundsPath = utils.FilePath("bounds", ".csv", prefix = ARGS.output_folder) mediumPath = utils.FilePath("medium", ".csv", prefix = ARGS.output_folder) if(ARGS.ras_selector == True): ras = pd.read_table(ARGS.input_ras, header=0, sep=r'\s+', index_col = 0).T ras.replace("None", None, inplace=True) ras = ras.astype(float) model_type :utils.Model = ARGS.model_selector if model_type is utils.Model.Custom: model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) else: model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) if(ARGS.medium_selector == "Custom"): medium = pd.read_csv(ARGS.input_medium, index_col = 0) medium = medium.astype(float) medium = medium['medium'].to_dict() else: df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) medium = df_mediums[[ARGS.medium_selector]] medium = medium[ARGS.medium_selector].to_dict() if(ARGS.ras_selector == True): bounds = generate_bounds(model, medium, ras = ras) else: bounds = generate_bounds(model, medium) bounds.to_csv(boundsPath.show()) medium.to_csv(mediumPath.show()) pass ############################################################################## if __name__ == "__main__": main()