| 
542
 | 
     1 """
 | 
| 
 | 
     2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
 | 
| 
 | 
     5 computes RAS per reaction for each sample/cell line, and writes a tabular output.
 | 
| 
 | 
     6 """
 | 
| 
 | 
     7 from __future__ import division
 | 
| 
 | 
     8 import sys
 | 
| 
 | 
     9 import argparse
 | 
| 
 | 
    10 import pandas as pd
 | 
| 
 | 
    11 import numpy as np
 | 
| 
 | 
    12 try:
 | 
| 
 | 
    13     from .utils import general_utils as utils
 | 
| 
 | 
    14 except:
 | 
| 
 | 
    15     import utils.general_utils as utils
 | 
| 
 | 
    16 from typing import List, Dict
 | 
| 
 | 
    17 import ast
 | 
| 
 | 
    18 
 | 
| 
 | 
    19 # Optional imports for AnnData mode (not used in ras_generator.py)
 | 
| 
 | 
    20 try:
 | 
| 
 | 
    21     from progressbar import ProgressBar, Bar, Percentage
 | 
| 
 | 
    22     from scanpy import AnnData
 | 
| 
 | 
    23     from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes
 | 
| 
 | 
    24 except ImportError:
 | 
| 
 | 
    25     # These are only needed for AnnData mode, not for ras_generator.py
 | 
| 
 | 
    26     pass
 | 
| 
 | 
    27 
 | 
| 
 | 
    28 ERRORS = []
 | 
| 
 | 
    29 ########################## argparse ##########################################
 | 
| 
 | 
    30 ARGS :argparse.Namespace
 | 
| 
 | 
    31 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
 | 
    32     """
 | 
| 
 | 
    33     Processes command-line arguments.
 | 
| 
 | 
    34 
 | 
| 
 | 
    35     Args:
 | 
| 
 | 
    36         args (list): List of command-line arguments.
 | 
| 
 | 
    37 
 | 
| 
 | 
    38     Returns:
 | 
| 
 | 
    39         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    40     """
 | 
| 
 | 
    41     parser = argparse.ArgumentParser(
 | 
| 
 | 
    42         usage = '%(prog)s [options]',
 | 
| 
 | 
    43         description = "process some value's genes to create a comparison's map.")
 | 
| 
 | 
    44     
 | 
| 
 | 
    45     parser.add_argument("-rl", "--model_upload", type = str,
 | 
| 
 | 
    46         help = "path to input file containing the rules")
 | 
| 
 | 
    47 
 | 
| 
 | 
    48     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
 | 
| 
 | 
    49     # Galaxy converts files into .dat, this helps infer the original extension when needed.
 | 
| 
 | 
    50     
 | 
| 
 | 
    51     parser.add_argument(
 | 
| 
 | 
    52         '-n', '--none',
 | 
| 
 | 
    53         type = utils.Bool("none"), default = True,
 | 
| 
 | 
    54         help = 'compute Nan values')
 | 
| 
 | 
    55     
 | 
| 
 | 
    56     parser.add_argument(
 | 
| 
 | 
    57         '-td', '--tool_dir',
 | 
| 
 | 
    58         type = str,
 | 
| 
 | 
    59         default = os.path.dirname(os.path.abspath(__file__)),
 | 
| 
 | 
    60         help = 'your tool directory (default: auto-detected package location)')
 | 
| 
 | 
    61     
 | 
| 
 | 
    62     parser.add_argument(
 | 
| 
 | 
    63         '-ol', '--out_log',
 | 
| 
 | 
    64         type = str,
 | 
| 
 | 
    65         help = "Output log")    
 | 
| 
 | 
    66     
 | 
| 
 | 
    67     parser.add_argument(
 | 
| 
 | 
    68         '-in', '--input',
 | 
| 
 | 
    69         type = str,
 | 
| 
 | 
    70         help = 'input dataset')
 | 
| 
 | 
    71     
 | 
| 
 | 
    72     parser.add_argument(
 | 
| 
 | 
    73         '-ra', '--ras_output',
 | 
| 
 | 
    74         type = str,
 | 
| 
 | 
    75         required = True, help = 'ras output')
 | 
| 
 | 
    76 
 | 
| 
 | 
    77     
 | 
| 
 | 
    78     return parser.parse_args(args)
 | 
| 
 | 
    79 
 | 
| 
 | 
    80 ############################ dataset input ####################################
 | 
| 
 | 
    81 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
    82     """
 | 
| 
 | 
    83     Read a dataset from a CSV file and return it as a pandas DataFrame.
 | 
| 
 | 
    84 
 | 
| 
 | 
    85     Args:
 | 
| 
 | 
    86         data (str): Path to the CSV file containing the dataset.
 | 
| 
 | 
    87         name (str): Name of the dataset, used in error messages.
 | 
| 
 | 
    88 
 | 
| 
 | 
    89     Returns:
 | 
| 
 | 
    90         pandas.DataFrame: DataFrame containing the dataset.
 | 
| 
 | 
    91 
 | 
| 
 | 
    92     Raises:
 | 
| 
 | 
    93         pd.errors.EmptyDataError: If the CSV file is empty.
 | 
| 
 | 
    94         sys.exit: If the CSV file has the wrong format, the execution is aborted.
 | 
| 
 | 
    95     """
 | 
| 
 | 
    96     try:
 | 
| 
 | 
    97         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
 | 
| 
 | 
    98         dataset = dataset.astype(float)
 | 
| 
 | 
    99     except pd.errors.EmptyDataError:
 | 
| 
 | 
   100         sys.exit('Execution aborted: wrong file format of ' + name + '\n')
 | 
| 
 | 
   101     if len(dataset.columns) < 2:
 | 
| 
 | 
   102         sys.exit('Execution aborted: wrong file format of ' + name + '\n')
 | 
| 
 | 
   103     return dataset
 | 
| 
 | 
   104 
 | 
| 
 | 
   105 
 | 
| 
 | 
   106 def load_custom_rules() -> Dict[str,str]:
 | 
| 
 | 
   107     """
 | 
| 
 | 
   108     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
 | 
| 
 | 
   109     performed, significantly impacting the runtime.
 | 
| 
 | 
   110 
 | 
| 
 | 
   111     Returns:
 | 
| 
 | 
   112         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
 | 
| 
 | 
   113     """
 | 
| 
 | 
   114     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat
 | 
| 
 | 
   115 
 | 
| 
 | 
   116     dict_rule = {}
 | 
| 
 | 
   117 
 | 
| 
 | 
   118     try:
 | 
| 
 | 
   119         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
 | 
| 
 | 
   120         if len(rows) <= 1:
 | 
| 
 | 
   121             raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
 | 
   122 
 | 
| 
 | 
   123         if not rows:
 | 
| 
 | 
   124             raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   125         
 | 
| 
 | 
   126         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   127         
 | 
| 
 | 
   128     # First, try using a tab delimiter
 | 
| 
 | 
   129         for line in rows[1:]:
 | 
| 
 | 
   130             if len(line) <= idx_gpr:
 | 
| 
 | 
   131                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   132                 continue
 | 
| 
 | 
   133             
 | 
| 
 | 
   134             dict_rule[line[id_idx]] = line[idx_gpr] 
 | 
| 
 | 
   135 
 | 
| 
 | 
   136     except Exception as e:
 | 
| 
 | 
   137         # If parsing with tabs fails, try comma delimiter
 | 
| 
 | 
   138         try:
 | 
| 
 | 
   139             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
 | 
| 
 | 
   140             
 | 
| 
 | 
   141             if len(rows) <= 1:
 | 
| 
 | 
   142                 raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
 | 
   143 
 | 
| 
 | 
   144             if not rows:
 | 
| 
 | 
   145                 raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   146             
 | 
| 
 | 
   147             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   148             
 | 
| 
 | 
   149             # Try again parsing row content with the GPR column using comma-separated values
 | 
| 
 | 
   150             for line in rows[1:]:
 | 
| 
 | 
   151                 if len(line) <= idx_gpr:
 | 
| 
 | 
   152                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   153                     continue
 | 
| 
 | 
   154                 
 | 
| 
 | 
   155                 dict_rule[line[id_idx]] =line[idx_gpr]
 | 
| 
 | 
   156                     
 | 
| 
 | 
   157         except Exception as e2:
 | 
| 
 | 
   158             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
 | 
| 
 | 
   159 
 | 
| 
 | 
   160     if not dict_rule:
 | 
| 
 | 
   161             raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
 | 
| 
 | 
   162     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
 | 
| 
 | 
   163     return dict_rule
 | 
| 
 | 
   164 
 | 
| 
 | 
   165 
 | 
| 
 | 
   166 """
 | 
| 
 | 
   167 Class to compute the RAS values
 | 
| 
 | 
   168 
 | 
| 
 | 
   169 """
 | 
| 
 | 
   170 
 | 
| 
 | 
   171 class RAS_computation:
 | 
| 
 | 
   172 
 | 
| 
 | 
   173     def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None):
 | 
| 
 | 
   174         """
 | 
| 
 | 
   175         Initialize RAS computation with two possible input modes:
 | 
| 
 | 
   176         
 | 
| 
 | 
   177         Mode 1 (Original - for sampling_main.py):
 | 
| 
 | 
   178             adata: AnnData object with gene expression (cells × genes)
 | 
| 
 | 
   179             model: COBRApy model object with reactions and GPRs
 | 
| 
 | 
   180             
 | 
| 
 | 
   181         Mode 2 (New - for ras_generator.py):
 | 
| 
 | 
   182             dataset: pandas DataFrame with gene expression (genes × samples)
 | 
| 
 | 
   183             gene_rules: dict mapping reaction IDs to GPR strings
 | 
| 
 | 
   184             rules_total_string: list of all gene names in GPRs (for validation)
 | 
| 
 | 
   185         """
 | 
| 
 | 
   186         self._logic_operators = ['and', 'or', '(', ')']
 | 
| 
 | 
   187         self.val_nan = np.nan
 | 
| 
 | 
   188         
 | 
| 
 | 
   189         # Determine which mode we're in
 | 
| 
 | 
   190         if adata is not None and model is not None:
 | 
| 
 | 
   191             # Mode 1: AnnData + COBRApy model (original)
 | 
| 
 | 
   192             self._init_from_anndata(adata, model)
 | 
| 
 | 
   193         elif dataset is not None and gene_rules is not None:
 | 
| 
 | 
   194             # Mode 2: DataFrame + rules dict (ras_generator style)
 | 
| 
 | 
   195             self._init_from_dataframe(dataset, gene_rules, rules_total_string)
 | 
| 
 | 
   196         else:
 | 
| 
 | 
   197             raise ValueError(
 | 
| 
 | 
   198                 "Invalid initialization. Provide either:\n"
 | 
| 
 | 
   199                 "  - adata + model (for AnnData input), or\n"
 | 
| 
 | 
   200                 "  - dataset + gene_rules (for DataFrame input)"
 | 
| 
 | 
   201             )
 | 
| 
 | 
   202     
 | 
| 
 | 
   203     def _normalize_gene_name(self, gene_name):
 | 
| 
 | 
   204         """Normalize gene names by replacing special characters."""
 | 
| 
 | 
   205         return gene_name.replace("-", "_").replace(":", "_")
 | 
| 
 | 
   206     
 | 
| 
 | 
   207     def _normalize_rule(self, rule):
 | 
| 
 | 
   208         """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names."""
 | 
| 
 | 
   209         rule = rule.replace("OR", "or").replace("AND", "and")
 | 
| 
 | 
   210         rule = rule.replace("(", "( ").replace(")", " )")
 | 
| 
 | 
   211         # Normalize gene names in the rule
 | 
| 
 | 
   212         tokens = rule.split()
 | 
| 
 | 
   213         normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens]
 | 
| 
 | 
   214         return " ".join(normalized_tokens)
 | 
| 
 | 
   215     
 | 
| 
 | 
   216     def _init_from_anndata(self, adata, model):
 | 
| 
 | 
   217         """Initialize from AnnData and COBRApy model (original mode)."""
 | 
| 
 | 
   218         # Build the dictionary for the GPRs
 | 
| 
 | 
   219         df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions])
 | 
| 
 | 
   220         gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions]
 | 
| 
 | 
   221         df_reactions['rule'] = gene_rules
 | 
| 
 | 
   222         df_reactions = df_reactions.reset_index()
 | 
| 
 | 
   223         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
 | 
| 
 | 
   224         
 | 
| 
 | 
   225         self.dict_rule_reactions = df_reactions.to_dict()['index']
 | 
| 
 | 
   226 
 | 
| 
 | 
   227         # build useful structures for RAS computation
 | 
| 
 | 
   228         self.model = model
 | 
| 
 | 
   229         self.count_adata = adata.copy()
 | 
| 
 | 
   230         
 | 
| 
 | 
   231         # Normalize gene names in both model and dataset
 | 
| 
 | 
   232         model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes]
 | 
| 
 | 
   233         dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index]
 | 
| 
 | 
   234         self.genes = pd.Index(dataset_genes).intersection(model_genes)
 | 
| 
 | 
   235         
 | 
| 
 | 
   236         if len(self.genes) == 0:
 | 
| 
 | 
   237             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
 | 
| 
 | 
   238         
 | 
| 
 | 
   239         self.cell_ids = list(self.count_adata.obs.index.values)
 | 
| 
 | 
   240         # Get expression data with normalized gene names
 | 
| 
 | 
   241         self.count_df_filtered = self.count_adata.to_df().T
 | 
| 
 | 
   242         self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index]
 | 
| 
 | 
   243         self.count_df_filtered = self.count_df_filtered.loc[self.genes]
 | 
| 
 | 
   244     
 | 
| 
 | 
   245     def _init_from_dataframe(self, dataset, gene_rules, rules_total_string):
 | 
| 
 | 
   246         """Initialize from DataFrame and rules dict (ras_generator mode)."""
 | 
| 
 | 
   247         reactions = list(gene_rules.keys())
 | 
| 
 | 
   248         
 | 
| 
 | 
   249         # Build the dictionary for the GPRs
 | 
| 
 | 
   250         df_reactions = pd.DataFrame(index=reactions)
 | 
| 
 | 
   251         gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions]
 | 
| 
 | 
   252         df_reactions['rule'] = gene_rules_list
 | 
| 
 | 
   253         df_reactions = df_reactions.reset_index()
 | 
| 
 | 
   254         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
 | 
| 
 | 
   255 
 | 
| 
 | 
   256         self.dict_rule_reactions = df_reactions.to_dict()['index']
 | 
| 
 | 
   257 
 | 
| 
 | 
   258         # build useful structures for RAS computation
 | 
| 
 | 
   259         self.model = None
 | 
| 
 | 
   260         self.count_adata = None
 | 
| 
 | 
   261         
 | 
| 
 | 
   262         # Normalize gene names in dataset
 | 
| 
 | 
   263         dataset_normalized = dataset.copy()
 | 
| 
 | 
   264         dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index]
 | 
| 
 | 
   265         
 | 
| 
 | 
   266         # Determine which genes are in both dataset and GPRs
 | 
| 
 | 
   267         if rules_total_string is not None:
 | 
| 
 | 
   268             rules_genes = [self._normalize_gene_name(g) for g in rules_total_string]
 | 
| 
 | 
   269             self.genes = dataset_normalized.index.intersection(rules_genes)
 | 
| 
 | 
   270         else:
 | 
| 
 | 
   271             # Extract all genes from rules
 | 
| 
 | 
   272             all_genes_in_rules = set()
 | 
| 
 | 
   273             for rule in gene_rules_list:
 | 
| 
 | 
   274                 tokens = rule.split()
 | 
| 
 | 
   275                 for token in tokens:
 | 
| 
 | 
   276                     if token not in self._logic_operators:
 | 
| 
 | 
   277                         all_genes_in_rules.add(token)
 | 
| 
 | 
   278             self.genes = dataset_normalized.index.intersection(all_genes_in_rules)
 | 
| 
 | 
   279         
 | 
| 
 | 
   280         if len(self.genes) == 0:
 | 
| 
 | 
   281             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
 | 
| 
 | 
   282         
 | 
| 
 | 
   283         self.cell_ids = list(dataset_normalized.columns)
 | 
| 
 | 
   284         self.count_df_filtered = dataset_normalized.loc[self.genes]
 | 
| 
 | 
   285  
 | 
| 
 | 
   286     def compute(self,
 | 
| 
 | 
   287                 or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean)
 | 
| 
 | 
   288                 and_expression=np.min,      # type of operation to do in case of an and expression(min, sum)
 | 
| 
 | 
   289                 drop_na_rows=False,          # if True remove the nan rows of the ras  matrix
 | 
| 
 | 
   290                 drop_duplicates=False,      # if true, remove duplicates rows
 | 
| 
 | 
   291                 ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A)
 | 
| 
 | 
   292                 print_progressbar=True,     # if True, print the progress bar
 | 
| 
 | 
   293                 add_count_metadata=True,    # if True add metadata of cells in the ras adata
 | 
| 
 | 
   294                 add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions)
 | 
| 
 | 
   295                 add_essential_reactions=False,
 | 
| 
 | 
   296                 add_essential_genes=False
 | 
| 
 | 
   297                 ):
 | 
| 
 | 
   298 
 | 
| 
 | 
   299         self.or_function = or_expression
 | 
| 
 | 
   300         self.and_function = and_expression
 | 
| 
 | 
   301         
 | 
| 
 | 
   302         ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan)
 | 
| 
 | 
   303         genes_not_mapped = set()  # Track genes not in dataset
 | 
| 
 | 
   304         
 | 
| 
 | 
   305         if print_progressbar:
 | 
| 
 | 
   306             pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start()
 | 
| 
 | 
   307         
 | 
| 
 | 
   308         # Process each unique GPR rule
 | 
| 
 | 
   309         for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()):
 | 
| 
 | 
   310             if len(rule) == 0:
 | 
| 
 | 
   311                 # Empty rule - keep as NaN
 | 
| 
 | 
   312                 pass
 | 
| 
 | 
   313             else:
 | 
| 
 | 
   314                 # Extract genes from rule
 | 
| 
 | 
   315                 rule_genes = [token for token in rule.split() if token not in self._logic_operators]
 | 
| 
 | 
   316                 rule_genes_unique = list(set(rule_genes))
 | 
| 
 | 
   317                 
 | 
| 
 | 
   318                 # Which genes are in the dataset?
 | 
| 
 | 
   319                 genes_present = [g for g in rule_genes_unique if g in self.genes]
 | 
| 
 | 
   320                 genes_missing = [g for g in rule_genes_unique if g not in self.genes]
 | 
| 
 | 
   321                 
 | 
| 
 | 
   322                 if genes_missing:
 | 
| 
 | 
   323                     genes_not_mapped.update(genes_missing)
 | 
| 
 | 
   324                 
 | 
| 
 | 
   325                 if len(genes_present) == 0:
 | 
| 
 | 
   326                     # No genes in dataset - keep as NaN
 | 
| 
 | 
   327                     pass
 | 
| 
 | 
   328                 elif len(genes_missing) > 0 and not ignore_nan:
 | 
| 
 | 
   329                     # Some genes missing and we don't ignore NaN - set to NaN
 | 
| 
 | 
   330                     pass
 | 
| 
 | 
   331                 else:
 | 
| 
 | 
   332                     # Evaluate the GPR expression using AST
 | 
| 
 | 
   333                     # For single gene, AST handles it fine: ast.parse("GENE_A") works
 | 
| 
 | 
   334                     # more genes in the formula
 | 
| 
 | 
   335                     check_only_and=("and" in rule and "or" not in rule) #only and
 | 
| 
 | 
   336                     check_only_or=("or" in rule and "and" not in rule)  #only or
 | 
| 
 | 
   337                     if check_only_and or check_only_or:
 | 
| 
 | 
   338                         #or/and sequence
 | 
| 
 | 
   339                         matrix = self.count_df_filtered.loc[genes_present].values
 | 
| 
 | 
   340                         #compute for all cells
 | 
| 
 | 
   341                         if check_only_and: 
 | 
| 
 | 
   342                             ras_df[ind] = self.and_function(matrix, axis=0)
 | 
| 
 | 
   343                         else:
 | 
| 
 | 
   344                             ras_df[ind] = self.or_function(matrix, axis=0)
 | 
| 
 | 
   345                     else:
 | 
| 
 | 
   346                         # complex expression (e.g. A or (B and C))
 | 
| 
 | 
   347                         data = self.count_df_filtered.loc[genes_present]  # dataframe of genes in the GPRs
 | 
| 
 | 
   348                         tree = ast.parse(rule, mode="eval").body
 | 
| 
 | 
   349                         values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
 | 
| 
 | 
   350                         for j, values in enumerate(values_by_cell):
 | 
| 
 | 
   351                             ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan)
 | 
| 
 | 
   352 
 | 
| 
 | 
   353             if print_progressbar:
 | 
| 
 | 
   354                 pbar.update(ind + 1)
 | 
| 
 | 
   355         
 | 
| 
 | 
   356         if print_progressbar:
 | 
| 
 | 
   357             pbar.finish()
 | 
| 
 | 
   358         
 | 
| 
 | 
   359         # Store genes not mapped for later use
 | 
| 
 | 
   360         self.genes_not_mapped = sorted(genes_not_mapped)
 | 
| 
 | 
   361         
 | 
| 
 | 
   362         # create the dataframe of ras (rules x samples)
 | 
| 
 | 
   363         ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids)
 | 
| 
 | 
   364         ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()]
 | 
| 
 | 
   365         
 | 
| 
 | 
   366         reactions_common = pd.DataFrame()
 | 
| 
 | 
   367         reactions_common["Reactions"] = ras_df['Reactions']
 | 
| 
 | 
   368         reactions_common["proof2"] = ras_df['Reactions']
 | 
| 
 | 
   369         reactions_common = reactions_common.explode('Reactions')
 | 
| 
 | 
   370         reactions_common = reactions_common.set_index("Reactions")
 | 
| 
 | 
   371 
 | 
| 
 | 
   372         ras_df = ras_df.explode("Reactions")
 | 
| 
 | 
   373         ras_df = ras_df.set_index("Reactions")
 | 
| 
 | 
   374 
 | 
| 
 | 
   375         if drop_na_rows:
 | 
| 
 | 
   376             ras_df = ras_df.dropna(how="all")
 | 
| 
 | 
   377             
 | 
| 
 | 
   378         if drop_duplicates:
 | 
| 
 | 
   379             ras_df = ras_df.drop_duplicates()
 | 
| 
 | 
   380         
 | 
| 
 | 
   381         # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData
 | 
| 
 | 
   382         if self.count_adata is None:
 | 
| 
 | 
   383             return ras_df, self.genes_not_mapped
 | 
| 
 | 
   384         
 | 
| 
 | 
   385         # Original AnnData mode: create AnnData structure for RAS
 | 
| 
 | 
   386         ras_adata = AnnData(ras_df.T)
 | 
| 
 | 
   387 
 | 
| 
 | 
   388         #add metadata
 | 
| 
 | 
   389         if add_count_metadata:
 | 
| 
 | 
   390             ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index]
 | 
| 
 | 
   391             ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x))
 | 
| 
 | 
   392             for el in self.count_adata.obs.columns:
 | 
| 
 | 
   393                 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el]
 | 
| 
 | 
   394 
 | 
| 
 | 
   395         if add_met_metadata:
 | 
| 
 | 
   396             if self.model is not None and len(self.model.compartments)>0:
 | 
| 
 | 
   397                   ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index]  
 | 
| 
 | 
   398                   ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x))
 | 
| 
 | 
   399             
 | 
| 
 | 
   400             if self.model is not None:
 | 
| 
 | 
   401                 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index]
 | 
| 
 | 
   402 
 | 
| 
 | 
   403         if add_essential_reactions:
 | 
| 
 | 
   404             if self.model is not None:
 | 
| 
 | 
   405                 essential_reactions=find_essential_reactions(self.model)
 | 
| 
 | 
   406                 essential_reactions=[el.id for el in essential_reactions]            
 | 
| 
 | 
   407                 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index]
 | 
| 
 | 
   408         
 | 
| 
 | 
   409         if add_essential_genes:
 | 
| 
 | 
   410             if self.model is not None:
 | 
| 
 | 
   411                 essential_genes=find_essential_genes(self.model)
 | 
| 
 | 
   412                 essential_genes=[el.id for el in essential_genes]
 | 
| 
 | 
   413                 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]]
 | 
| 
 | 
   414         
 | 
| 
 | 
   415         return ras_adata
 | 
| 
 | 
   416 
 | 
| 
 | 
   417     def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan):
 | 
| 
 | 
   418         """
 | 
| 
 | 
   419         Evaluate a boolean expression using AST (Abstract Syntax Tree).
 | 
| 
 | 
   420         Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)).
 | 
| 
 | 
   421         
 | 
| 
 | 
   422         Args:
 | 
| 
 | 
   423             node: AST node to evaluate
 | 
| 
 | 
   424             values: Dictionary mapping gene names to their expression values
 | 
| 
 | 
   425             or_function: Function to apply for OR operations
 | 
| 
 | 
   426             and_function: Function to apply for AND operations
 | 
| 
 | 
   427             ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A)
 | 
| 
 | 
   428             
 | 
| 
 | 
   429         Returns:
 | 
| 
 | 
   430             Evaluated expression result (float or np.nan)
 | 
| 
 | 
   431         """
 | 
| 
 | 
   432         if isinstance(node, ast.BoolOp):
 | 
| 
 | 
   433             # Boolean operation (and/or)
 | 
| 
 | 
   434             vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values]
 | 
| 
 | 
   435             
 | 
| 
 | 
   436             if ignore_nan:
 | 
| 
 | 
   437                 # Filter out None/NaN values
 | 
| 
 | 
   438                 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))]
 | 
| 
 | 
   439             
 | 
| 
 | 
   440             if not vals:
 | 
| 
 | 
   441                 return np.nan
 | 
| 
 | 
   442             
 | 
| 
 | 
   443             if isinstance(node.op, ast.Or):
 | 
| 
 | 
   444                 return or_function(vals)
 | 
| 
 | 
   445             elif isinstance(node.op, ast.And):
 | 
| 
 | 
   446                 return and_function(vals)
 | 
| 
 | 
   447 
 | 
| 
 | 
   448         elif isinstance(node, ast.Name):
 | 
| 
 | 
   449             # Variable (gene name)
 | 
| 
 | 
   450             return values.get(node.id, None)
 | 
| 
 | 
   451         elif isinstance(node, ast.Constant):
 | 
| 
 | 
   452             # Constant (shouldn't happen in GPRs, but handle it)
 | 
| 
 | 
   453             return values.get(str(node.value), None)
 | 
| 
 | 
   454         else:
 | 
| 
 | 
   455             raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}")
 | 
| 
 | 
   456 
 | 
| 
 | 
   457 
 | 
| 
 | 
   458 # ============================================================================
 | 
| 
 | 
   459 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY
 | 
| 
 | 
   460 # ============================================================================
 | 
| 
 | 
   461 
 | 
| 
 | 
   462 def computeRAS(
 | 
| 
 | 
   463     dataset, 
 | 
| 
 | 
   464     gene_rules, 
 | 
| 
 | 
   465     rules_total_string,
 | 
| 
 | 
   466     or_function=np.sum,
 | 
| 
 | 
   467     and_function=np.min,
 | 
| 
 | 
   468     ignore_nan=True
 | 
| 
 | 
   469 ):
 | 
| 
 | 
   470     """
 | 
| 
 | 
   471     Compute RAS from tabular data and GPR rules (ras_generator.py compatible).
 | 
| 
 | 
   472     
 | 
| 
 | 
   473     This is a standalone function that wraps the RAS_computation class
 | 
| 
 | 
   474     to provide the same interface as ras_generator.py.
 | 
| 
 | 
   475     
 | 
| 
 | 
   476     Args:
 | 
| 
 | 
   477         dataset: pandas DataFrame with gene expression (genes × samples)
 | 
| 
 | 
   478         gene_rules: dict mapping reaction IDs to GPR strings
 | 
| 
 | 
   479         rules_total_string: list of all gene names in GPRs
 | 
| 
 | 
   480         or_function: function for OR operations (default: np.sum)
 | 
| 
 | 
   481         and_function: function for AND operations (default: np.min)
 | 
| 
 | 
   482         ignore_nan: if True, ignore NaN in GPR evaluation (default: True)
 | 
| 
 | 
   483     
 | 
| 
 | 
   484     Returns:
 | 
| 
 | 
   485         tuple: (ras_df, genes_not_mapped)
 | 
| 
 | 
   486             - ras_df: DataFrame with RAS values (reactions × samples)
 | 
| 
 | 
   487             - genes_not_mapped: list of genes in GPRs not found in dataset
 | 
| 
 | 
   488     """
 | 
| 
 | 
   489     # Create RAS computation object in DataFrame mode
 | 
| 
 | 
   490     ras_obj = RAS_computation(
 | 
| 
 | 
   491         dataset=dataset,
 | 
| 
 | 
   492         gene_rules=gene_rules,
 | 
| 
 | 
   493         rules_total_string=rules_total_string
 | 
| 
 | 
   494     )
 | 
| 
 | 
   495     
 | 
| 
 | 
   496     # Compute RAS
 | 
| 
 | 
   497     result = ras_obj.compute(
 | 
| 
 | 
   498         or_expression=or_function,
 | 
| 
 | 
   499         and_expression=and_function,
 | 
| 
 | 
   500         ignore_nan=ignore_nan,
 | 
| 
 | 
   501         print_progressbar=False,  # No progress bar for ras_generator
 | 
| 
 | 
   502         add_count_metadata=False,  # No metadata in DataFrame mode
 | 
| 
 | 
   503         add_met_metadata=False,
 | 
| 
 | 
   504         add_essential_reactions=False,
 | 
| 
 | 
   505         add_essential_genes=False
 | 
| 
 | 
   506     )
 | 
| 
 | 
   507     
 | 
| 
 | 
   508     # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode
 | 
| 
 | 
   509     return result
 | 
| 
 | 
   510 
 | 
| 
 | 
   511 def main(args:List[str] = None) -> None:
 | 
| 
 | 
   512     """
 | 
| 
 | 
   513     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   514     
 | 
| 
 | 
   515     Returns:
 | 
| 
 | 
   516         None
 | 
| 
 | 
   517     """
 | 
| 
 | 
   518     # get args from frontend (related xml)
 | 
| 
 | 
   519     global ARGS
 | 
| 
 | 
   520     ARGS = process_args(args)
 | 
| 
 | 
   521 
 | 
| 
 | 
   522     # read dataset and remove versioning from gene names
 | 
| 
 | 
   523     dataset = read_dataset(ARGS.input, "dataset")
 | 
| 
 | 
   524     orig_gene_list=dataset.index.copy()
 | 
| 
 | 
   525     dataset.index =  [str(el.split(".")[0]) for el in dataset.index]  
 | 
| 
 | 
   526 
 | 
| 
 | 
   527     #load GPR rules
 | 
| 
 | 
   528     rules = load_custom_rules()
 | 
| 
 | 
   529     
 | 
| 
 | 
   530     #create a list of all the gpr
 | 
| 
 | 
   531     rules_total_string=""
 | 
| 
 | 
   532     for id,rule in rules.items():
 | 
| 
 | 
   533         rules_total_string+=rule.replace("(","").replace(")","") + " "
 | 
| 
 | 
   534     rules_total_string=list(set(rules_total_string.split(" ")))
 | 
| 
 | 
   535 
 | 
| 
 | 
   536     if any(dataset.index.duplicated(keep=False)):
 | 
| 
 | 
   537         genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)]
 | 
| 
 | 
   538         genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string]
 | 
| 
 | 
   539 
 | 
| 
 | 
   540         if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset
 | 
| 
 | 
   541             list_str=", ".join(genes_duplicates_in_model)
 | 
| 
 | 
   542             list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str
 | 
| 
 | 
   543             raise ValueError(list_genes)       
 | 
| 
 | 
   544         else:
 | 
| 
 | 
   545             list_str=", ".join(genes_duplicates)
 | 
| 
 | 
   546             list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str           
 | 
| 
 | 
   547             utils.logWarning(list_genes,ARGS.out_log)  
 | 
| 
 | 
   548 
 | 
| 
 | 
   549     #check if nan value must be ignored in the GPR 
 | 
| 
 | 
   550     if ARGS.none:
 | 
| 
 | 
   551     #    #e.g. (A or nan --> A)
 | 
| 
 | 
   552         ignore_nan = True
 | 
| 
 | 
   553     else:
 | 
| 
 | 
   554         #e.g. (A or nan --> nan)
 | 
| 
 | 
   555         ignore_nan = False
 | 
| 
 | 
   556     
 | 
| 
 | 
   557     #compure ras
 | 
| 
 | 
   558     ras_df,genes_not_mapped=computeRAS(dataset,rules,
 | 
| 
 | 
   559                     rules_total_string,
 | 
| 
 | 
   560                     or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
 | 
| 
 | 
   561                     and_function=np.min,
 | 
| 
 | 
   562                     ignore_nan=ignore_nan)
 | 
| 
 | 
   563     
 | 
| 
 | 
   564     #save to csv and replace nan with None
 | 
| 
 | 
   565     ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t')
 | 
| 
 | 
   566 
 | 
| 
 | 
   567     #report genes not present in the data
 | 
| 
 | 
   568     if len(genes_not_mapped)>0: 
 | 
| 
 | 
   569         genes_not_mapped_str=", ".join(genes_not_mapped)
 | 
| 
 | 
   570         utils.logWarning(
 | 
| 
 | 
   571         f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str,
 | 
| 
 | 
   572         ARGS.out_log)  
 | 
| 
 | 
   573  
 | 
| 
 | 
   574     print("Execution succeeded")
 | 
| 
 | 
   575 
 | 
| 
 | 
   576 ###############################################################################
 | 
| 
 | 
   577 if __name__ == "__main__":
 | 
| 
539
 | 
   578     main() |