changeset 505:96f512dff490 draft

Uploaded
author francesco_lapi
date Wed, 01 Oct 2025 11:15:00 +0000
parents ef3df9b697b1
children ffc234ec80db
files COBRAxy/ras_generator.py COBRAxy/utils/model_utils.py
diffstat 2 files changed, 183 insertions(+), 481 deletions(-) [+]
line wrap: on
line diff
--- a/COBRAxy/ras_generator.py	Tue Sep 30 18:08:56 2025 +0000
+++ b/COBRAxy/ras_generator.py	Wed Oct 01 11:15:00 2025 +0000
@@ -7,12 +7,11 @@
 from __future__ import division
 import sys
 import argparse
-import collections
 import pandas as pd
-import pickle as pk
+import numpy as np
 import utils.general_utils as utils
-import utils.rule_parsing as ruleUtils
-from typing import Union, Optional, List, Dict, Tuple, TypeVar
+from typing import  List, Dict
+import ast
 
 ERRORS = []
 ########################## argparse ##########################################
@@ -82,467 +81,16 @@
         sys.exit: If the CSV file has the wrong format, the execution is aborted.
     """
     try:
-        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
+        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
+        dataset = dataset.astype(float)
     except pd.errors.EmptyDataError:
-        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
     if len(dataset.columns) < 2:
-        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
     return dataset
 
-############################ load id e rules ##################################
-def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
-    """
-    Load IDs and rules from a dictionary of reactions.
-
-    Args:
-        reactions (dict): A dictionary where keys are IDs and values are rules.
-
-    Returns:
-        tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
-    """
-    ids, rules = [], []
-    for key, value in reactions.items():
-            ids.append(key)
-            rules.append(value)
-    return (ids, rules)
-
 
-############################ gene #############################################
-def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
-    """
-    Process gene data to ensure correct formatting and handle duplicates.
-
-    Args:
-        gene (DataFrame): DataFrame containing gene data.
-        type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
-        name (str): Name of the dataset.
-        gene_custom (dict or None): Custom gene data dictionary if provided.
-
-    Returns:
-        dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
-    """
- 
-    for i in range(len(gene)):
-        tmp = gene.iloc[i, 0]
-        gene.iloc[i, 0] = tmp.strip().split('.')[0]
-
-    gene_dup = [item for item, count in 
-               collections.Counter(gene[gene.columns[0]]).items() if count > 1]
-    pat_dup = [item for item, count in 
-               collections.Counter(list(gene.columns)).items() if count > 1]
-    
-    gene_in_rule = None
-
-    if gene_dup:
-        if gene_custom == None:
-
-            if str(ARGS.rules_selector) == 'HMRcore':
-                gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
-            
-            elif str(ARGS.rules_selector) == 'Recon':
-                gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
-            
-            elif str(ARGS.rules_selector) == 'ENGRO2':
-                gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
-
-            utils.logWarning(f"{ARGS.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log)
-
-            gene_in_rule = gene_in_rule.get(type_gene)
-        
-        else:
-            gene_in_rule = gene_custom
-
-        tmp = []
-        for i in gene_dup:
-            if gene_in_rule.get(i) == 'ok':
-                tmp.append(i)
-        if tmp:
-            sys.exit('Execution aborted because gene ID '
-                     +str(tmp)+' in '+name+' is duplicated\n')
-    
-    if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
-    return (gene.set_index(gene.columns[0])).to_dict()
-
-############################ resolve ##########################################
-def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
-    """
-    Replace gene identifiers in a parsed rule expression with values from a dict.
-
-    Args:
-        l: Parsed rule as a nested list structure (strings, lists, and operators).
-        d: Dict mapping gene IDs to numeric values.
-
-    Returns:
-        tuple: (new_expression, not_found_genes)
-    """
-    tmp = []
-    err = []
-    while l:
-        if isinstance(l[0], list):
-            tmp_rules, tmp_err = replace_gene_value(l[0], d)
-            tmp.append(tmp_rules)
-            err.extend(tmp_err)
-        else:
-            value = replace_gene(l[0], d)
-            tmp.append(value)
-            if value == None:
-                err.append(l[0])
-        l = l[1:]
-    return (tmp, err)
-
-def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]:
-    """
-    Replace a single gene identifier with its corresponding value from a dictionary.
-
-    Args:
-        l (str): Gene identifier to replace.
-        d (dict): Dict mapping gene IDs to numeric values.
-
-    Returns:
-        float/int/None: Corresponding value from the dictionary if found, None otherwise.
-
-    Raises:
-        sys.exit: If the value associated with the gene identifier is not valid.
-    """
-    if l =='and' or l == 'or':
-        return l
-    else:
-        value = d.get(l, None)
-        if not(value == None or isinstance(value, (int, float))):
-            sys.exit('Execution aborted: ' + value + ' value not valid\n')
-        return value
-
-T = TypeVar("T", bound = Optional[Union[int, float]])
-def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
-    """
-    Compute the RAS value between two value and an operator ('and' or 'or').
-
-    Args:
-        val1(Optional(Union[float, int])): First value.
-        op (str): Operator ('and' or 'or').
-        val2(Optional(Union[float, int])): Second value.
-        cn (bool): Control boolean value.
-
-    Returns:
-        Optional(Union[float, int]): Result of the computation.
-    """
-    if val1 != None and val2 != None:
-        if op == 'and':
-            return min(val1, val2)
-        else:
-            return val1 + val2
-    elif op == 'and':
-        if cn is True:
-            if val1 != None:
-                return val1
-            elif val2 != None:
-                return val2
-            else:
-                return None
-        else:
-            return None
-    else:
-        if val1 != None:
-            return val1
-        elif val2 != None:
-            return val2
-        else:
-            return None
-
-# ris should be Literal[None] but Literal is not supported in Python 3.7
-def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
-    """
-    Control the format of the expression.
-
-    Args:
-        ris: Intermediate result.
-        l (list): Expression to control.
-        cn (bool): Control boolean value.
-
-    Returns:
-        Union[Literal[False], int, float]: Result of the control.
-    """
-    if len(l) == 1:
-        if isinstance(l[0], (float, int)) or l[0] == None:
-            return l[0]
-        elif isinstance(l[0], list):
-            return control(None, l[0], cn)
-        else:
-            return False
-    elif len(l) > 2:
-        return control_list(ris, l, cn)
-    else:
-        return False
-
-def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
-    """
-    Control the format of a list of expressions.
-
-    Args:
-        ris: Intermediate result.
-        l (list): List of expressions to control.
-        cn (bool): Control boolean value.
-
-    Returns:
-        Optional[Literal[False]]: Result of the control.
-    """
-    while l:
-        if len(l) == 1:
-            return False
-        elif (isinstance(l[0], (float, int)) or
-              l[0] == None) and l[1] in ['and', 'or']:
-            if isinstance(l[2], (float, int)) or l[2] == None:
-                ris = computes(l[0], l[1], l[2], cn)            
-            elif isinstance(l[2], list):
-                tmp = control(None, l[2], cn)
-                if tmp is False:
-                    return False
-                else:
-                    ris = computes(l[0], l[1], tmp, cn)
-            else:
-                return False
-            l = l[3:]
-        elif l[0] in ['and', 'or']:
-            if isinstance(l[1], (float, int)) or l[1] == None:
-                ris = computes(ris, l[0], l[1], cn)
-            elif isinstance(l[1], list):
-                tmp = control(None,l[1], cn)
-                if tmp is False:
-                    return False
-                else:
-                    ris = computes(ris, l[0], tmp, cn)
-            else:
-                return False
-            l = l[2:]
-        elif isinstance(l[0], list) and l[1] in ['and', 'or']:
-            if isinstance(l[2], (float, int)) or l[2] == None:
-                tmp = control(None, l[0], cn)
-                if tmp is False:
-                    return False
-                else:
-                    ris = computes(tmp, l[1], l[2], cn)
-            elif isinstance(l[2], list):
-                tmp = control(None, l[0], cn)
-                tmp2 = control(None, l[2], cn)
-                if tmp is False or tmp2 is False:
-                    return False
-                else:
-                    ris = computes(tmp, l[1], tmp2, cn)
-            else:
-                return False
-            l = l[3:]
-        else:
-            return False
-    return ris
-
-ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
-def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
-    """
-    Resolve rules using gene data to compute scores for each rule.
-
-    Args:
-        genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
-        rules (list): List of rules to resolve.
-        ids (list): List of IDs corresponding to the rules.
-        resolve_none (bool): Flag indicating whether to resolve None values in the rules.
-        name (str): Name of the dataset.
-
-    Returns:
-        tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
-    """
-    resolve_rules = {}
-    not_found = []
-    flag = False
-    for key, value in genes.items():
-        tmp_resolve = []
-        for i in range(len(rules)):
-            tmp = rules[i]
-            if tmp:
-                tmp, err = replace_gene_value(tmp, value)
-                if err:
-                    not_found.extend(err)
-                ris = control(None, tmp, resolve_none)
-                if ris is False or ris == None:
-                    tmp_resolve.append(None)
-                else:
-                    tmp_resolve.append(ris)
-                    flag = True
-            else:
-                tmp_resolve.append(None)    
-        resolve_rules[key] = tmp_resolve
-    
-    if flag is False:
-        utils.logWarning(
-            f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
-            ARGS.out_log)
-        
-        return (None, None)
-    
-    return (resolve_rules, list(set(not_found)))
-############################ create_ras #######################################
-def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
-    """
-    Create a RAS (Reaction Activity Score) file from resolved rules.
-
-    Args:
-        resolve_rules (dict): Dictionary containing resolved rules.
-        dataset_name (str): Name of the dataset.
-        rules (list): List of rules.
-        file (str): Path to the output RAS file.
-
-    Returns:
-        None
-    """
-    if resolve_rules is None:
-        utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
-
-    for geni in resolve_rules.values():
-        for i, valori in enumerate(geni):
-            if valori == None:
-                geni[i] = 'None'
-                
-    output_ras = pd.DataFrame.from_dict(resolve_rules)
-    
-    output_ras.insert(0, 'Reactions', ids)
-    output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
-    
-    text_file = open(file, "w")
-    
-    text_file.write(output_to_csv)
-    text_file.close()
-
-################################- NEW RAS COMPUTATION -################################
-Expr = Optional[Union[int, float]]
-Ras  = Expr
-def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
-    """
-    Generates the RAS scores for each cell line found in the dataset.
-
-    Args:
-        dataset (pd.DataFrame): Dataset containing gene values.
-        rules (dict): The dict containing reaction ids as keys and rules as values.
-    
-    Note:
-        Modifies dataset in place by setting the first column as index.
-    
-    Returns:
-        dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
-        where each key corresponds to a reaction ID and each value is its computed RAS score.
-    """
-    ras_values_by_cell_line = {}
-    dataset.set_index(dataset.columns[0], inplace=True)
-    
-    for cell_line_name in dataset.columns: #[1:]:
-        cell_line = dataset[cell_line_name].to_dict()
-        ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
-    return ras_values_by_cell_line
-
-def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
-    """
-    Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
-
-    Args:
-        value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
-        dataset : gene expression data of one cell line.
-
-    Returns:
-        dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
-    """
-    return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
-
-def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
-    """
-    Extracts the gene expression of the given gene from a cell line dataset.
-
-    Args:
-        dataset : gene expression data of one cell line.
-        name : gene name.
-    
-    Returns:
-        Expr : the gene's expression value.
-    """
-    expr = dataset.get(name, None)
-    if expr is None: ERRORS.append(name)
-  
-    return expr
-
-def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
-    """
-    Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
-
-    Args:
-        op_list (OpList): The OpList representing a rule with gene values.
-        dataset : gene expression data of one cell line.
-
-    Returns:
-        Ras: The computed RAS value for the given OpList.
-    """
-    op = op_list.op
-    ras_value :Ras = None
-    if not op: return get_gene_expr(dataset, op_list[0])
-    if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
-
-    for i in range(len(op_list)):
-        item = op_list[i]
-        if isinstance(item, ruleUtils.OpList):
-            item = ras_op_list(item, dataset)
-
-        else:
-          item = get_gene_expr(dataset, item)
-
-        if item is None:
-          if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
-          continue
-
-        if ras_value is None:
-          ras_value = item
-        else:
-          ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
-
-    return ras_value
-
-def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
-    """
-    Save computed RAS scores to ARGS.ras_output as a TSV file.
-
-    Args:
-        rasScores : the computed ras scores.
-        reactions : the list of reaction IDs, used as the first column.
-    
-    Returns:
-        None
-    """
-    for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
-        for reactId, score in scores.items():
-            if score is None: scores[reactId] = "None"
-
-    output_ras = pd.DataFrame.from_dict(rasScores)
-    output_ras.insert(0, 'Reactions', reactions)
-    output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
-
-############################ MAIN #############################################
-#TODO: not used but keep, it will be when the new translator dicts will be used.
-def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
-    """
-    Translate gene from any supported encoding to HugoID.
-
-    Args:
-        geneName (str): the name of the gene in its current encoding.
-        encoding (str): the encoding.
-        geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
-        and encodings in the current model, mapping each to the corresponding HugoID encoding.
-
-    Raises:
-        ValueError: When the gene isn't supported in the model.
-
-    Returns:
-        str: the gene in HugoID encoding.
-    """
-    supportedGenesInEncoding = geneTranslator[encoding]
-    if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
-    raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.")
-
-def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
+def load_custom_rules() -> Dict[str,str]:
     """
     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
     performed, significantly impacting the runtime.
@@ -570,11 +118,8 @@
                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
                 continue
             
-            if line[idx_gpr] == "":
-                dict_rule[line[id_idx]] = ruleUtils.OpList([""])
-            else:
-                dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
-                
+            dict_rule[line[id_idx]] = line[idx_gpr] 
+
     except Exception as e:
         # If parsing with tabs fails, try comma delimiter
         try:
@@ -594,10 +139,7 @@
                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
                     continue
                 
-                if line[idx_gpr] == "":
-                    dict_rule[line[id_idx]] = ruleUtils.OpList([""])
-                else:
-                    dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
+                dict_rule[line[id_idx]] =line[idx_gpr]
                     
         except Exception as e2:
             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
@@ -608,6 +150,118 @@
     return dict_rule
 
 
+
+def computeRAS(
+            dataset,gene_rules,rules_total_string,
+            or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
+            and_function=np.min,   # type of operation to do in case of an and expression(min, sum)
+            ignore_nan = True
+            ):
+
+
+    logic_operators = ['and', 'or', '(', ')']
+    reactions=list(gene_rules.keys())
+
+    # Build the dictionary for the GPRs
+    df_reactions = pd.DataFrame(index=reactions)
+    gene_rules=[gene_rules[reaction_id].replace("OR","or").replace("AND","and").replace("(","( ").replace(")"," )") for reaction_id in reactions]        
+    df_reactions['rule'] = gene_rules
+    df_reactions = df_reactions.reset_index()
+    df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
+
+    dict_rule_reactions = df_reactions.to_dict()['index']
+
+    # build useful structures for RAS computation
+    genes =dataset.index.intersection(rules_total_string)
+    
+    #check if there is one gene at least 
+    if len(genes)==0:
+        print("ERROR: no gene of the count matrix is in the metabolic model!")
+        print(" are you sure that the gene annotation is the same for the model and the count matrix?")
+        return -1
+    
+    cell_ids = list(dataset.columns)
+    count_df_filtered = dataset.loc[genes]
+    count_df_filtered = count_df_filtered.rename(index=lambda x: x.replace("-", "_"))
+
+    ras_df=np.full((len(dict_rule_reactions), len(cell_ids)), np.nan)
+
+    # for loop on rules
+    ind = 0       
+    for rule, reaction_ids in dict_rule_reactions.items():
+        if len(rule) != 0:
+            # there is one gene at least in the formula
+            rule_split = rule.split()
+            rule_split_elements = list(set(rule_split))                                # genes in formula
+            
+            # which genes are in the count matrix?                
+            genes_in_count_matrix = [el for el in rule_split_elements if el in genes]
+            genes_notin_count_matrix = [el for el in rule_split_elements if el not in genes]
+
+            if len(genes_in_count_matrix) > 0: #there is at least one gene in the count matrix
+                    if len(rule_split) == 1:
+                        #one gene --> one reaction
+                        ras_df[ind] = count_df_filtered.loc[genes_in_count_matrix]
+                    else:    
+                        if len(genes_notin_count_matrix) > 0 and ignore_nan == False:
+                                ras_df[ind] = np.nan
+                        else:                   
+                            # more genes in the formula
+                            check_only_and=("and" in rule and "or" not in rule) #only and
+                            check_only_or=("or" in rule and "and" not in rule)  #only or
+                            if check_only_and or check_only_or:
+                                #or/and sequence
+                                matrix = count_df_filtered.loc[genes_in_count_matrix].values
+                                #compute for all cells
+                                if check_only_and: 
+                                    ras_df[ind] = and_function(matrix, axis=0)
+                                else:
+                                    ras_df[ind] = or_function(matrix, axis=0)
+                            else:
+                                # complex expression (e.g. A or (B and C))
+                                data = count_df_filtered.loc[genes_in_count_matrix]  # dataframe of genes in the GPRs
+
+                                rule = rule.replace("-", "_")
+                                tree = ast.parse(rule, mode="eval").body
+                                values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
+                                for j, values in enumerate(values_by_cell):
+                                    ras_df[ind, j] = _evaluate_ast(tree, values, or_function, and_function)
+    
+        ind +=1
+    
+    #create the dataframe of ras (rules x samples)
+    ras_df= pd.DataFrame(data=ras_df,index=range(len(dict_rule_reactions)), columns=cell_ids)
+    ras_df['Reactions'] = [reaction_ids for rule,reaction_ids in dict_rule_reactions.items()]
+    
+    #create the reaction dataframe for ras (reactions x samples)
+    ras_df = ras_df.explode("Reactions").set_index("Reactions")
+
+    return ras_df
+
+# function to evalute a complex boolean expression e.g. A or (B and C)
+def _evaluate_ast( node, values,or_function,and_function):
+    if isinstance(node, ast.BoolOp):
+        # Ricorsione sugli argomenti
+        vals = [_evaluate_ast(v, values,or_function,and_function) for v in node.values]
+       
+        vals = [v for v in vals if v is not None]
+        if not vals:
+            return None
+      
+        vals = [np.array(v) if isinstance(v, (list, np.ndarray)) else v for v in vals]
+
+        if isinstance(node.op, ast.Or):
+            return or_function(vals)
+        elif isinstance(node.op, ast.And):
+            return and_function(vals)
+
+    elif isinstance(node, ast.Name):
+        return values.get(node.id, None)
+    elif isinstance(node, ast.Constant):
+        return node.value
+    else:
+        raise ValueError(f"Error in boolean expression: {ast.dump(node)}")
+
 def main(args:List[str] = None) -> None:
     """
     Initializes everything and sets the program in motion based on the fronted input arguments.
@@ -619,24 +273,41 @@
     global ARGS
     ARGS = process_args(args)
 
-    # read dataset
+    # read dataset and remove versioning from gene names
     dataset = read_dataset(ARGS.input, "dataset")
-    dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
+    dataset.index =  [str(el.split(".")[0]) for el in dataset.index]
 
-    # remove versioning from gene names
-    dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
-
+    #load GPR rules
     rules = load_custom_rules()
-    reactions = list(rules.keys())
+    
+    #create a list of all the gpr
+    rules_total_string=""
+    for id,rule in rules.items():
+        rules_total_string+=rule.replace("(","").replace(")","") + " "
+    rules_total_string=list(set(rules_total_string.split(" ")))
 
-    save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
-    if ERRORS: utils.logWarning(
-        f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
-        ARGS.out_log)  
+    #check if nan value must be ignored in the GPR 
+    print(ARGS.none,"\n\n\n*************",ARGS.none==True)
+    if ARGS.none:
+    #    #e.g. (A or nan --> A)
+        ignore_nan = True
+    else:
+        #e.g. (A or nan --> nan)
+        ignore_nan = False
+    
+    #compure ras
+    ras_df=computeRAS(dataset,rules,
+                    rules_total_string,
+                    or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
+                    and_function=np.min,
+                    ignore_nan=ignore_nan)
+    
+    #save to csv and replace nan with None
+    ras_df.replace(np.nan,"None").to_csv(ARGS.ras_output, sep = '\t')
 
-
+    #print 
     print("Execution succeeded")
 
 ###############################################################################
 if __name__ == "__main__":
-    main()
+    main()
\ No newline at end of file
--- a/COBRAxy/utils/model_utils.py	Tue Sep 30 18:08:56 2025 +0000
+++ b/COBRAxy/utils/model_utils.py	Wed Oct 01 11:15:00 2025 +0000
@@ -269,7 +269,36 @@
 
     return pd.DataFrame(pathway_data)
 
+def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
+    """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
+    pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
+    if not pathway_cols:
+        print("No 'Pathway_' columns found, skipping pathway annotation")
+        return
+    
+    pathway_data = defaultdict(list)
+    
+    for idx, row in df.iterrows():
+        reaction_id = str(row['ReactionID']).strip()
+        if reaction_id not in model.reactions:
+            continue
+        
+        pathways = []
+        for col in pathway_cols:
+            if pd.notna(row[col]) and str(row[col]).strip():
+                pathways.append(str(row[col]).strip())
+        
+        if pathways:
+            
+            reaction = model.reactions.get_by_id(reaction_id)
+            if len(pathways) == 1:
+                reaction.annotation['pathways'] = pathways[0]
+            else:
+                reaction.annotation['pathways'] = pathways
 
+            pathway_data[reaction_id] = pathways
+    
+    print(f"Annotated {len(pathway_data)} reactions with pathways.")
 
 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
     """
@@ -366,6 +395,8 @@
     set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
 
     set_medium_from_data(model, df)
+
+    set_annotation_pathways_from_data(model, df)
     
     print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")