diff COBRAxy/src/ras_generator.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/COBRAxy/src/ras_generator.py	Sat Oct 25 14:55:13 2025 +0000
@@ -0,0 +1,574 @@
+"""
+Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
+
+The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
+computes RAS per reaction for each sample/cell line, and writes a tabular output.
+"""
+from __future__ import division
+import sys
+import argparse
+import pandas as pd
+import numpy as np
+import utils.general_utils as utils
+from typing import List, Dict
+import ast
+
+# Optional imports for AnnData mode (not used in ras_generator.py)
+try:
+    from progressbar import ProgressBar, Bar, Percentage
+    from scanpy import AnnData
+    from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes
+except ImportError:
+    # These are only needed for AnnData mode, not for ras_generator.py
+    pass
+
+ERRORS = []
+########################## argparse ##########################################
+ARGS :argparse.Namespace
+def process_args(args:List[str] = None) -> argparse.Namespace:
+    """
+    Processes command-line arguments.
+
+    Args:
+        args (list): List of command-line arguments.
+
+    Returns:
+        Namespace: An object containing parsed arguments.
+    """
+    parser = argparse.ArgumentParser(
+        usage = '%(prog)s [options]',
+        description = "process some value's genes to create a comparison's map.")
+    
+    parser.add_argument("-rl", "--model_upload", type = str,
+        help = "path to input file containing the rules")
+
+    parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
+    # Galaxy converts files into .dat, this helps infer the original extension when needed.
+    
+    parser.add_argument(
+        '-n', '--none',
+        type = utils.Bool("none"), default = True,
+        help = 'compute Nan values')
+    
+    parser.add_argument(
+        '-td', '--tool_dir',
+        type = str,
+        required = True, help = 'your tool directory')
+    
+    parser.add_argument(
+        '-ol', '--out_log',
+        type = str,
+        help = "Output log")    
+    
+    parser.add_argument(
+        '-in', '--input',
+        type = str,
+        help = 'input dataset')
+    
+    parser.add_argument(
+        '-ra', '--ras_output',
+        type = str,
+        required = True, help = 'ras output')
+
+    
+    return parser.parse_args(args)
+
+############################ dataset input ####################################
+def read_dataset(data :str, name :str) -> pd.DataFrame:
+    """
+    Read a dataset from a CSV file and return it as a pandas DataFrame.
+
+    Args:
+        data (str): Path to the CSV file containing the dataset.
+        name (str): Name of the dataset, used in error messages.
+
+    Returns:
+        pandas.DataFrame: DataFrame containing the dataset.
+
+    Raises:
+        pd.errors.EmptyDataError: If the CSV file is empty.
+        sys.exit: If the CSV file has the wrong format, the execution is aborted.
+    """
+    try:
+        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
+        dataset = dataset.astype(float)
+    except pd.errors.EmptyDataError:
+        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
+    if len(dataset.columns) < 2:
+        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
+    return dataset
+
+
+def load_custom_rules() -> Dict[str,str]:
+    """
+    Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
+    performed, significantly impacting the runtime.
+
+    Returns:
+        Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
+    """
+    datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat
+
+    dict_rule = {}
+
+    try:
+        rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
+        if len(rows) <= 1:
+            raise ValueError("Model tabular with 1 column is not supported.")
+
+        if not rows:
+            raise ValueError("Model tabular is file is empty.")
+        
+        id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
+        
+    # First, try using a tab delimiter
+        for line in rows[1:]:
+            if len(line) <= idx_gpr:
+                utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
+                continue
+            
+            dict_rule[line[id_idx]] = line[idx_gpr] 
+
+    except Exception as e:
+        # If parsing with tabs fails, try comma delimiter
+        try:
+            rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
+            
+            if len(rows) <= 1:
+                raise ValueError("Model tabular with 1 column is not supported.")
+
+            if not rows:
+                raise ValueError("Model tabular is file is empty.")
+            
+            id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
+            
+            # Try again parsing row content with the GPR column using comma-separated values
+            for line in rows[1:]:
+                if len(line) <= idx_gpr:
+                    utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
+                    continue
+                
+                dict_rule[line[id_idx]] =line[idx_gpr]
+                    
+        except Exception as e2:
+            raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
+
+    if not dict_rule:
+            raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
+    # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
+    return dict_rule
+
+
+"""
+Class to compute the RAS values
+
+"""
+
+class RAS_computation:
+
+    def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None):
+        """
+        Initialize RAS computation with two possible input modes:
+        
+        Mode 1 (Original - for sampling_main.py):
+            adata: AnnData object with gene expression (cells × genes)
+            model: COBRApy model object with reactions and GPRs
+            
+        Mode 2 (New - for ras_generator.py):
+            dataset: pandas DataFrame with gene expression (genes × samples)
+            gene_rules: dict mapping reaction IDs to GPR strings
+            rules_total_string: list of all gene names in GPRs (for validation)
+        """
+        self._logic_operators = ['and', 'or', '(', ')']
+        self.val_nan = np.nan
+        
+        # Determine which mode we're in
+        if adata is not None and model is not None:
+            # Mode 1: AnnData + COBRApy model (original)
+            self._init_from_anndata(adata, model)
+        elif dataset is not None and gene_rules is not None:
+            # Mode 2: DataFrame + rules dict (ras_generator style)
+            self._init_from_dataframe(dataset, gene_rules, rules_total_string)
+        else:
+            raise ValueError(
+                "Invalid initialization. Provide either:\n"
+                "  - adata + model (for AnnData input), or\n"
+                "  - dataset + gene_rules (for DataFrame input)"
+            )
+    
+    def _normalize_gene_name(self, gene_name):
+        """Normalize gene names by replacing special characters."""
+        return gene_name.replace("-", "_").replace(":", "_")
+    
+    def _normalize_rule(self, rule):
+        """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names."""
+        rule = rule.replace("OR", "or").replace("AND", "and")
+        rule = rule.replace("(", "( ").replace(")", " )")
+        # Normalize gene names in the rule
+        tokens = rule.split()
+        normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens]
+        return " ".join(normalized_tokens)
+    
+    def _init_from_anndata(self, adata, model):
+        """Initialize from AnnData and COBRApy model (original mode)."""
+        # Build the dictionary for the GPRs
+        df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions])
+        gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions]
+        df_reactions['rule'] = gene_rules
+        df_reactions = df_reactions.reset_index()
+        df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
+        
+        self.dict_rule_reactions = df_reactions.to_dict()['index']
+
+        # build useful structures for RAS computation
+        self.model = model
+        self.count_adata = adata.copy()
+        
+        # Normalize gene names in both model and dataset
+        model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes]
+        dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index]
+        self.genes = pd.Index(dataset_genes).intersection(model_genes)
+        
+        if len(self.genes) == 0:
+            raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
+        
+        self.cell_ids = list(self.count_adata.obs.index.values)
+        # Get expression data with normalized gene names
+        self.count_df_filtered = self.count_adata.to_df().T
+        self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index]
+        self.count_df_filtered = self.count_df_filtered.loc[self.genes]
+    
+    def _init_from_dataframe(self, dataset, gene_rules, rules_total_string):
+        """Initialize from DataFrame and rules dict (ras_generator mode)."""
+        reactions = list(gene_rules.keys())
+        
+        # Build the dictionary for the GPRs
+        df_reactions = pd.DataFrame(index=reactions)
+        gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions]
+        df_reactions['rule'] = gene_rules_list
+        df_reactions = df_reactions.reset_index()
+        df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
+
+        self.dict_rule_reactions = df_reactions.to_dict()['index']
+
+        # build useful structures for RAS computation
+        self.model = None
+        self.count_adata = None
+        
+        # Normalize gene names in dataset
+        dataset_normalized = dataset.copy()
+        dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index]
+        
+        # Determine which genes are in both dataset and GPRs
+        if rules_total_string is not None:
+            rules_genes = [self._normalize_gene_name(g) for g in rules_total_string]
+            self.genes = dataset_normalized.index.intersection(rules_genes)
+        else:
+            # Extract all genes from rules
+            all_genes_in_rules = set()
+            for rule in gene_rules_list:
+                tokens = rule.split()
+                for token in tokens:
+                    if token not in self._logic_operators:
+                        all_genes_in_rules.add(token)
+            self.genes = dataset_normalized.index.intersection(all_genes_in_rules)
+        
+        if len(self.genes) == 0:
+            raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
+        
+        self.cell_ids = list(dataset_normalized.columns)
+        self.count_df_filtered = dataset_normalized.loc[self.genes]
+ 
+    def compute(self,
+                or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean)
+                and_expression=np.min,      # type of operation to do in case of an and expression(min, sum)
+                drop_na_rows=False,          # if True remove the nan rows of the ras  matrix
+                drop_duplicates=False,      # if true, remove duplicates rows
+                ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A)
+                print_progressbar=True,     # if True, print the progress bar
+                add_count_metadata=True,    # if True add metadata of cells in the ras adata
+                add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions)
+                add_essential_reactions=False,
+                add_essential_genes=False
+                ):
+
+        self.or_function = or_expression
+        self.and_function = and_expression
+        
+        ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan)
+        genes_not_mapped = set()  # Track genes not in dataset
+        
+        if print_progressbar:
+            pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start()
+        
+        # Process each unique GPR rule
+        for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()):
+            if len(rule) == 0:
+                # Empty rule - keep as NaN
+                pass
+            else:
+                # Extract genes from rule
+                rule_genes = [token for token in rule.split() if token not in self._logic_operators]
+                rule_genes_unique = list(set(rule_genes))
+                
+                # Which genes are in the dataset?
+                genes_present = [g for g in rule_genes_unique if g in self.genes]
+                genes_missing = [g for g in rule_genes_unique if g not in self.genes]
+                
+                if genes_missing:
+                    genes_not_mapped.update(genes_missing)
+                
+                if len(genes_present) == 0:
+                    # No genes in dataset - keep as NaN
+                    pass
+                elif len(genes_missing) > 0 and not ignore_nan:
+                    # Some genes missing and we don't ignore NaN - set to NaN
+                    pass
+                else:
+                    # Evaluate the GPR expression using AST
+                    # For single gene, AST handles it fine: ast.parse("GENE_A") works
+                    # more genes in the formula
+                    check_only_and=("and" in rule and "or" not in rule) #only and
+                    check_only_or=("or" in rule and "and" not in rule)  #only or
+                    if check_only_and or check_only_or:
+                        #or/and sequence
+                        matrix = self.count_df_filtered.loc[genes_present].values
+                        #compute for all cells
+                        if check_only_and: 
+                            ras_df[ind] = self.and_function(matrix, axis=0)
+                        else:
+                            ras_df[ind] = self.or_function(matrix, axis=0)
+                    else:
+                        # complex expression (e.g. A or (B and C))
+                        data = self.count_df_filtered.loc[genes_present]  # dataframe of genes in the GPRs
+                        tree = ast.parse(rule, mode="eval").body
+                        values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
+                        for j, values in enumerate(values_by_cell):
+                            ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan)
+
+            if print_progressbar:
+                pbar.update(ind + 1)
+        
+        if print_progressbar:
+            pbar.finish()
+        
+        # Store genes not mapped for later use
+        self.genes_not_mapped = sorted(genes_not_mapped)
+        
+        # create the dataframe of ras (rules x samples)
+        ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids)
+        ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()]
+        
+        reactions_common = pd.DataFrame()
+        reactions_common["Reactions"] = ras_df['Reactions']
+        reactions_common["proof2"] = ras_df['Reactions']
+        reactions_common = reactions_common.explode('Reactions')
+        reactions_common = reactions_common.set_index("Reactions")
+
+        ras_df = ras_df.explode("Reactions")
+        ras_df = ras_df.set_index("Reactions")
+
+        if drop_na_rows:
+            ras_df = ras_df.dropna(how="all")
+            
+        if drop_duplicates:
+            ras_df = ras_df.drop_duplicates()
+        
+        # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData
+        if self.count_adata is None:
+            return ras_df, self.genes_not_mapped
+        
+        # Original AnnData mode: create AnnData structure for RAS
+        ras_adata = AnnData(ras_df.T)
+
+        #add metadata
+        if add_count_metadata:
+            ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index]
+            ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x))
+            for el in self.count_adata.obs.columns:
+                ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el]
+
+        if add_met_metadata:
+            if self.model is not None and len(self.model.compartments)>0:
+                  ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index]  
+                  ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x))
+            
+            if self.model is not None:
+                ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index]
+
+        if add_essential_reactions:
+            if self.model is not None:
+                essential_reactions=find_essential_reactions(self.model)
+                essential_reactions=[el.id for el in essential_reactions]            
+                ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index]
+        
+        if add_essential_genes:
+            if self.model is not None:
+                essential_genes=find_essential_genes(self.model)
+                essential_genes=[el.id for el in essential_genes]
+                ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]]
+        
+        return ras_adata
+
+    def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan):
+        """
+        Evaluate a boolean expression using AST (Abstract Syntax Tree).
+        Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)).
+        
+        Args:
+            node: AST node to evaluate
+            values: Dictionary mapping gene names to their expression values
+            or_function: Function to apply for OR operations
+            and_function: Function to apply for AND operations
+            ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A)
+            
+        Returns:
+            Evaluated expression result (float or np.nan)
+        """
+        if isinstance(node, ast.BoolOp):
+            # Boolean operation (and/or)
+            vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values]
+            
+            if ignore_nan:
+                # Filter out None/NaN values
+                vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))]
+            
+            if not vals:
+                return np.nan
+            
+            if isinstance(node.op, ast.Or):
+                return or_function(vals)
+            elif isinstance(node.op, ast.And):
+                return and_function(vals)
+
+        elif isinstance(node, ast.Name):
+            # Variable (gene name)
+            return values.get(node.id, None)
+        elif isinstance(node, ast.Constant):
+            # Constant (shouldn't happen in GPRs, but handle it)
+            return values.get(str(node.value), None)
+        else:
+            raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}")
+
+
+# ============================================================================
+# STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY
+# ============================================================================
+
+def computeRAS(
+    dataset, 
+    gene_rules, 
+    rules_total_string,
+    or_function=np.sum,
+    and_function=np.min,
+    ignore_nan=True
+):
+    """
+    Compute RAS from tabular data and GPR rules (ras_generator.py compatible).
+    
+    This is a standalone function that wraps the RAS_computation class
+    to provide the same interface as ras_generator.py.
+    
+    Args:
+        dataset: pandas DataFrame with gene expression (genes × samples)
+        gene_rules: dict mapping reaction IDs to GPR strings
+        rules_total_string: list of all gene names in GPRs
+        or_function: function for OR operations (default: np.sum)
+        and_function: function for AND operations (default: np.min)
+        ignore_nan: if True, ignore NaN in GPR evaluation (default: True)
+    
+    Returns:
+        tuple: (ras_df, genes_not_mapped)
+            - ras_df: DataFrame with RAS values (reactions × samples)
+            - genes_not_mapped: list of genes in GPRs not found in dataset
+    """
+    # Create RAS computation object in DataFrame mode
+    ras_obj = RAS_computation(
+        dataset=dataset,
+        gene_rules=gene_rules,
+        rules_total_string=rules_total_string
+    )
+    
+    # Compute RAS
+    result = ras_obj.compute(
+        or_expression=or_function,
+        and_expression=and_function,
+        ignore_nan=ignore_nan,
+        print_progressbar=False,  # No progress bar for ras_generator
+        add_count_metadata=False,  # No metadata in DataFrame mode
+        add_met_metadata=False,
+        add_essential_reactions=False,
+        add_essential_genes=False
+    )
+    
+    # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode
+    return result
+
+def main(args:List[str] = None) -> None:
+    """
+    Initializes everything and sets the program in motion based on the fronted input arguments.
+    
+    Returns:
+        None
+    """
+    # get args from frontend (related xml)
+    global ARGS
+    ARGS = process_args(args)
+
+    # read dataset and remove versioning from gene names
+    dataset = read_dataset(ARGS.input, "dataset")
+    orig_gene_list=dataset.index.copy()
+    dataset.index =  [str(el.split(".")[0]) for el in dataset.index]  
+
+    #load GPR rules
+    rules = load_custom_rules()
+    
+    #create a list of all the gpr
+    rules_total_string=""
+    for id,rule in rules.items():
+        rules_total_string+=rule.replace("(","").replace(")","") + " "
+    rules_total_string=list(set(rules_total_string.split(" ")))
+
+    if any(dataset.index.duplicated(keep=False)):
+        genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)]
+        genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string]
+
+        if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset
+            list_str=", ".join(genes_duplicates_in_model)
+            list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str
+            raise ValueError(list_genes)       
+        else:
+            list_str=", ".join(genes_duplicates)
+            list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str           
+            utils.logWarning(list_genes,ARGS.out_log)  
+
+    #check if nan value must be ignored in the GPR 
+    if ARGS.none:
+    #    #e.g. (A or nan --> A)
+        ignore_nan = True
+    else:
+        #e.g. (A or nan --> nan)
+        ignore_nan = False
+    
+    #compure ras
+    ras_df,genes_not_mapped=computeRAS(dataset,rules,
+                    rules_total_string,
+                    or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
+                    and_function=np.min,
+                    ignore_nan=ignore_nan)
+    
+    #save to csv and replace nan with None
+    ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t')
+
+    #report genes not present in the data
+    if len(genes_not_mapped)>0: 
+        genes_not_mapped_str=", ".join(genes_not_mapped)
+        utils.logWarning(
+        f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str,
+        ARGS.out_log)  
+ 
+    print("Execution succeeded")
+
+###############################################################################
+if __name__ == "__main__":
+    main()
\ No newline at end of file