view marea_2/ras_to_bounds.py @ 132:6c7354ba1461 draft

Uploaded
author luca_milaz
date Sun, 21 Jul 2024 20:39:27 +0000
parents 957b9f5006b0
children da75823f150a
line wrap: on
line source

import argparse
import utils.general_utils as utils
from typing import Optional, List
import os
import numpy as np
import pandas as pd
import cobra
import sys
import csv

################################# process args ###############################
def process_args(args :List[str]) -> argparse.Namespace:
    """
    Processes command-line arguments.

    Args:
        args (list): List of command-line arguments.

    Returns:
        Namespace: An object containing parsed arguments.
    """
    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
                                     description = 'process some value\'s')
    
    parser.add_argument(
        '-ms', '--model_selector', 
        type = utils.Model, default = utils.Model.ENGRO2, choices = [utils.Model.ENGRO2, utils.Model.Custom],
        help = 'chose which type of model you want use')
    
    parser.add_argument("-mo", "--model", type = str,
        help = "path to input file with custom rules, if provided")
    
    parser.add_argument("-mn", "--model_name", type = str, help = "custom mode name")

    parser.add_argument(
        '-mes', '--medium_selector', 
        default = "allOpen",
        help = 'chose which type of medium you want use')
    
    parser.add_argument("-meo", "--medium", type = str,
        help = "path to input file with custom medium, if provided")

    parser.add_argument('-ol', '--out_log', 
                        help = "Output log")
    
    parser.add_argument('-td', '--tool_dir',
                        type = str,
                        required = True,
                        help = 'your tool directory')
    
    parser.add_argument('-ir', '--input_ras',
                        type=str,
                        required = False,
                        help = 'input ras')
    
    parser.add_argument('-rs', '--ras_selector',
                        required = True,
                        type=bool,
                        help = 'ras selector')
    
    ARGS = parser.parse_args()
    return ARGS

########################### warning ###########################################
def warning(s :str) -> None:
    """
    Log a warning message to an output log file and print it to the console.

    Args:
        s (str): The warning message to be logged and printed.
    
    Returns:
      None
    """
    with open(ARGS.out_log, 'a') as log:
        log.write(s + "\n\n")
    print(s)

############################ dataset input ####################################
def read_dataset(data :str, name :str) -> pd.DataFrame:
    """
    Read a dataset from a CSV file and return it as a pandas DataFrame.

    Args:
        data (str): Path to the CSV file containing the dataset.
        name (str): Name of the dataset, used in error messages.

    Returns:
        pandas.DataFrame: DataFrame containing the dataset.

    Raises:
        pd.errors.EmptyDataError: If the CSV file is empty.
        sys.exit: If the CSV file has the wrong format, the execution is aborted.
    """
    try:
        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
    except pd.errors.EmptyDataError:
        sys.exit('Execution aborted: wrong format of ' + name + '\n')
    if len(dataset.columns) < 2:
        sys.exit('Execution aborted: wrong format of ' + name + '\n')
    return dataset

def generate_bounds(model:cobra.Model, medium:dict, ras=None) -> pd.DataFrame:
    model_new = model.copy()
    rxns_ids = []
    for rxn in model.reactions:
        rxns_ids.append(rxn.id)
    for reaction in medium.keys():
        if(medium[reaction] != None):
            model_new.reactions.get_by_id(reaction).lower_bound=-float(medium[reaction])
    df_FVA = cobra.flux_analysis.flux_variability_analysis(model_new,fraction_of_optimum=0,processes=1).round(8)
    for reaction in rxns_ids:
        model_new.reactions.get_by_id(reaction).lower_bound=float(df_FVA.loc[reaction,"minimum"])
        model_new.reactions.get_by_id(reaction).upper_bound=float(df_FVA.loc[reaction,"maximum"])

    if(ras is not  None):
        for cellName, ras in ras.iterrows():
            for reaction in rxns_ids:
                if (reaction in ras.keys() and ras[reaction] != None and pd.notna(ras[reaction])):
                    lower_bound=model_new.reactions.get_by_id(reaction).lower_bound
                    upper_bound=model_new.reactions.get_by_id(reaction).upper_bound
                    valMax=float((upper_bound)*ras[reaction])
                    valMin=float((lower_bound)*ras[reaction])
                    if upper_bound!=0 and lower_bound==0:
                        model_new.reactions.get_by_id(reaction).upper_bound=valMax
                    if upper_bound==0 and lower_bound!=0:
                        model_new.reactions.get_by_id(reaction).lower_bound=valMin
                    if upper_bound!=0 and lower_bound!=0:
                        model_new.reactions.get_by_id(reaction).lower_bound=valMin
                        model_new.reactions.get_by_id(reaction).upper_bound=valMax
            bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns_ids)
            for reaction in model.reactions:
                bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
            bounds.to_csv(ARGS.output_folder + cellName + ".csv", sep = '\t', index = False)
    else:
        for reaction in rxns_ids:
            lower_bound=model_new.reactions.get_by_id(reaction).lower_bound
            upper_bound=model_new.reactions.get_by_id(reaction).upper_bound
            valMax = float((upper_bound)*1)
            valMin=float((lower_bound)*1)
            if upper_bound!=0 and lower_bound==0:
                model_new.reactions.get_by_id(reaction).upper_bound=valMax
            if upper_bound==0 and lower_bound!=0:
                model_new.reactions.get_by_id(reaction).lower_bound=valMin
            if upper_bound!=0 and lower_bound!=0:
                model_new.reactions.get_by_id(reaction).lower_bound=valMin
                model_new.reactions.get_by_id(reaction).upper_bound=valMax
        bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns_ids)
        for reaction in model.reactions:
            bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
        bounds.to_csv(ARGS.output_folder + "bounds.csv", sep = '\t', index = False)

    
    pass


############################# main ###########################################
def main() -> None:
    """
    Initializes everything and sets the program in motion based on the fronted input arguments.

    Returns:
        None
    """
    if not os.path.exists('ras_to_bounds'):
        os.makedirs('ras_to_bounds')

    if not os.path.exists('ras_to_bounds_medium'):
        os.makedirs('ras_to_bounds_medium')

    global ARGS
    ARGS = process_args(sys.argv)

    ARGS.output_folder = 'ras_to_bounds/'

    mediumPath = utils.FilePath("medium", ".csv", prefix = 'ras_to_bounds_medium')

    if(ARGS.ras_selector == True):
        ras = read_dataset(ARGS.input_ras, "ras dataset")
        ras.replace("None", None, inplace=True)
        ras.set_index("Reactions", drop=True, inplace=True)
        ras = ras.T
        ras = ras.astype(float)
    
    model_type :utils.Model = ARGS.model_selector
    if model_type is utils.Model.Custom:
        model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext)
    else:
        model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir)

    if(ARGS.medium_selector == "Custom"):
        medium = read_dataset(ARGS.input_medium, "medium dataset")
        medium = medium.astype(float)
        medium = medium['medium'].to_dict()
    else:
        df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
        medium = df_mediums[[ARGS.medium_selector]]
        medium = medium[ARGS.medium_selector].to_dict()

    if(ARGS.ras_selector == True):
        generate_bounds(model, medium, ras = ras)
    else:
        generate_bounds(model, medium)

    mediumDf = pd.DataFrame.from_dict(medium, orient='index', columns=["value"])
    mediumDf.to_csv(mediumPath.show(), sep = '\t')
    pass
        
##############################################################################
if __name__ == "__main__":
    main()