| 
489
 | 
     1 """
 | 
| 
 | 
     2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
 | 
| 
 | 
     5 computes RAS per reaction for each sample/cell line, and writes a tabular output.
 | 
| 
 | 
     6 """
 | 
| 
93
 | 
     7 from __future__ import division
 | 
| 
 | 
     8 import sys
 | 
| 
 | 
     9 import argparse
 | 
| 
 | 
    10 import pandas as pd
 | 
| 
505
 | 
    11 import numpy as np
 | 
| 
93
 | 
    12 import utils.general_utils as utils
 | 
| 
529
 | 
    13 from typing import List, Dict
 | 
| 
505
 | 
    14 import ast
 | 
| 
93
 | 
    15 
 | 
| 
529
 | 
    16 # Optional imports for AnnData mode (not used in ras_generator.py)
 | 
| 
 | 
    17 try:
 | 
| 
 | 
    18     from progressbar import ProgressBar, Bar, Percentage
 | 
| 
 | 
    19     from scanpy import AnnData
 | 
| 
 | 
    20     from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes
 | 
| 
 | 
    21 except ImportError:
 | 
| 
 | 
    22     # These are only needed for AnnData mode, not for ras_generator.py
 | 
| 
 | 
    23     pass
 | 
| 
 | 
    24 
 | 
| 
93
 | 
    25 ERRORS = []
 | 
| 
 | 
    26 ########################## argparse ##########################################
 | 
| 
 | 
    27 ARGS :argparse.Namespace
 | 
| 
147
 | 
    28 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
93
 | 
    29     """
 | 
| 
 | 
    30     Processes command-line arguments.
 | 
| 
 | 
    31 
 | 
| 
 | 
    32     Args:
 | 
| 
 | 
    33         args (list): List of command-line arguments.
 | 
| 
 | 
    34 
 | 
| 
 | 
    35     Returns:
 | 
| 
 | 
    36         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    37     """
 | 
| 
 | 
    38     parser = argparse.ArgumentParser(
 | 
| 
 | 
    39         usage = '%(prog)s [options]',
 | 
| 
 | 
    40         description = "process some value's genes to create a comparison's map.")
 | 
| 
 | 
    41     
 | 
| 
489
 | 
    42     parser.add_argument("-rl", "--model_upload", type = str,
 | 
| 
 | 
    43         help = "path to input file containing the rules")
 | 
| 
93
 | 
    44 
 | 
| 
489
 | 
    45     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
 | 
| 
 | 
    46     # Galaxy converts files into .dat, this helps infer the original extension when needed.
 | 
| 
93
 | 
    47     
 | 
| 
 | 
    48     parser.add_argument(
 | 
| 
 | 
    49         '-n', '--none',
 | 
| 
 | 
    50         type = utils.Bool("none"), default = True,
 | 
| 
 | 
    51         help = 'compute Nan values')
 | 
| 
 | 
    52     
 | 
| 
 | 
    53     parser.add_argument(
 | 
| 
 | 
    54         '-td', '--tool_dir',
 | 
| 
 | 
    55         type = str,
 | 
| 
 | 
    56         required = True, help = 'your tool directory')
 | 
| 
 | 
    57     
 | 
| 
 | 
    58     parser.add_argument(
 | 
| 
 | 
    59         '-ol', '--out_log',
 | 
| 
 | 
    60         type = str,
 | 
| 
 | 
    61         help = "Output log")    
 | 
| 
 | 
    62     
 | 
| 
 | 
    63     parser.add_argument(
 | 
| 
489
 | 
    64         '-in', '--input',
 | 
| 
93
 | 
    65         type = str,
 | 
| 
 | 
    66         help = 'input dataset')
 | 
| 
 | 
    67     
 | 
| 
 | 
    68     parser.add_argument(
 | 
| 
 | 
    69         '-ra', '--ras_output',
 | 
| 
 | 
    70         type = str,
 | 
| 
 | 
    71         required = True, help = 'ras output')
 | 
| 
147
 | 
    72 
 | 
| 
93
 | 
    73     
 | 
| 
147
 | 
    74     return parser.parse_args(args)
 | 
| 
93
 | 
    75 
 | 
| 
 | 
    76 ############################ dataset input ####################################
 | 
| 
 | 
    77 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
    78     """
 | 
| 
 | 
    79     Read a dataset from a CSV file and return it as a pandas DataFrame.
 | 
| 
 | 
    80 
 | 
| 
 | 
    81     Args:
 | 
| 
 | 
    82         data (str): Path to the CSV file containing the dataset.
 | 
| 
 | 
    83         name (str): Name of the dataset, used in error messages.
 | 
| 
 | 
    84 
 | 
| 
 | 
    85     Returns:
 | 
| 
 | 
    86         pandas.DataFrame: DataFrame containing the dataset.
 | 
| 
 | 
    87 
 | 
| 
 | 
    88     Raises:
 | 
| 
 | 
    89         pd.errors.EmptyDataError: If the CSV file is empty.
 | 
| 
 | 
    90         sys.exit: If the CSV file has the wrong format, the execution is aborted.
 | 
| 
 | 
    91     """
 | 
| 
 | 
    92     try:
 | 
| 
505
 | 
    93         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
 | 
| 
 | 
    94         dataset = dataset.astype(float)
 | 
| 
93
 | 
    95     except pd.errors.EmptyDataError:
 | 
| 
505
 | 
    96         sys.exit('Execution aborted: wrong file format of ' + name + '\n')
 | 
| 
93
 | 
    97     if len(dataset.columns) < 2:
 | 
| 
505
 | 
    98         sys.exit('Execution aborted: wrong file format of ' + name + '\n')
 | 
| 
93
 | 
    99     return dataset
 | 
| 
 | 
   100 
 | 
| 
 | 
   101 
 | 
| 
505
 | 
   102 def load_custom_rules() -> Dict[str,str]:
 | 
| 
93
 | 
   103     """
 | 
| 
 | 
   104     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
 | 
| 
 | 
   105     performed, significantly impacting the runtime.
 | 
| 
 | 
   106 
 | 
| 
 | 
   107     Returns:
 | 
| 
 | 
   108         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
 | 
| 
 | 
   109     """
 | 
| 
489
 | 
   110     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat
 | 
| 
 | 
   111 
 | 
| 
 | 
   112     dict_rule = {}
 | 
| 
 | 
   113 
 | 
| 
 | 
   114     try:
 | 
| 
 | 
   115         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
 | 
| 
 | 
   116         if len(rows) <= 1:
 | 
| 
 | 
   117             raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
381
 | 
   118 
 | 
| 
489
 | 
   119         if not rows:
 | 
| 
 | 
   120             raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   121         
 | 
| 
 | 
   122         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   123         
 | 
| 
 | 
   124     # First, try using a tab delimiter
 | 
| 
 | 
   125         for line in rows[1:]:
 | 
| 
 | 
   126             if len(line) <= idx_gpr:
 | 
| 
 | 
   127                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   128                 continue
 | 
| 
 | 
   129             
 | 
| 
505
 | 
   130             dict_rule[line[id_idx]] = line[idx_gpr] 
 | 
| 
 | 
   131 
 | 
| 
489
 | 
   132     except Exception as e:
 | 
| 
 | 
   133         # If parsing with tabs fails, try comma delimiter
 | 
| 
 | 
   134         try:
 | 
| 
 | 
   135             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
 | 
| 
 | 
   136             
 | 
| 
 | 
   137             if len(rows) <= 1:
 | 
| 
 | 
   138                 raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
 | 
   139 
 | 
| 
 | 
   140             if not rows:
 | 
| 
 | 
   141                 raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   142             
 | 
| 
 | 
   143             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   144             
 | 
| 
 | 
   145             # Try again parsing row content with the GPR column using comma-separated values
 | 
| 
 | 
   146             for line in rows[1:]:
 | 
| 
 | 
   147                 if len(line) <= idx_gpr:
 | 
| 
 | 
   148                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   149                     continue
 | 
| 
 | 
   150                 
 | 
| 
505
 | 
   151                 dict_rule[line[id_idx]] =line[idx_gpr]
 | 
| 
489
 | 
   152                     
 | 
| 
 | 
   153         except Exception as e2:
 | 
| 
 | 
   154             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
 | 
| 
 | 
   155 
 | 
| 
 | 
   156     if not dict_rule:
 | 
| 
 | 
   157             raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
 | 
| 
93
 | 
   158     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
 | 
| 
489
 | 
   159     return dict_rule
 | 
| 
 | 
   160 
 | 
| 
401
 | 
   161 
 | 
| 
529
 | 
   162 """
 | 
| 
 | 
   163 Class to compute the RAS values
 | 
| 
505
 | 
   164 
 | 
| 
529
 | 
   165 """
 | 
| 
 | 
   166 
 | 
| 
 | 
   167 class RAS_computation:
 | 
| 
 | 
   168 
 | 
| 
 | 
   169     def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None):
 | 
| 
 | 
   170         """
 | 
| 
 | 
   171         Initialize RAS computation with two possible input modes:
 | 
| 
 | 
   172         
 | 
| 
 | 
   173         Mode 1 (Original - for sampling_main.py):
 | 
| 
 | 
   174             adata: AnnData object with gene expression (cells × genes)
 | 
| 
 | 
   175             model: COBRApy model object with reactions and GPRs
 | 
| 
 | 
   176             
 | 
| 
 | 
   177         Mode 2 (New - for ras_generator.py):
 | 
| 
 | 
   178             dataset: pandas DataFrame with gene expression (genes × samples)
 | 
| 
 | 
   179             gene_rules: dict mapping reaction IDs to GPR strings
 | 
| 
 | 
   180             rules_total_string: list of all gene names in GPRs (for validation)
 | 
| 
 | 
   181         """
 | 
| 
 | 
   182         self._logic_operators = ['and', 'or', '(', ')']
 | 
| 
 | 
   183         self.val_nan = np.nan
 | 
| 
 | 
   184         
 | 
| 
 | 
   185         # Determine which mode we're in
 | 
| 
 | 
   186         if adata is not None and model is not None:
 | 
| 
 | 
   187             # Mode 1: AnnData + COBRApy model (original)
 | 
| 
 | 
   188             self._init_from_anndata(adata, model)
 | 
| 
 | 
   189         elif dataset is not None and gene_rules is not None:
 | 
| 
 | 
   190             # Mode 2: DataFrame + rules dict (ras_generator style)
 | 
| 
 | 
   191             self._init_from_dataframe(dataset, gene_rules, rules_total_string)
 | 
| 
 | 
   192         else:
 | 
| 
 | 
   193             raise ValueError(
 | 
| 
 | 
   194                 "Invalid initialization. Provide either:\n"
 | 
| 
 | 
   195                 "  - adata + model (for AnnData input), or\n"
 | 
| 
 | 
   196                 "  - dataset + gene_rules (for DataFrame input)"
 | 
| 
 | 
   197             )
 | 
| 
 | 
   198     
 | 
| 
 | 
   199     def _normalize_gene_name(self, gene_name):
 | 
| 
 | 
   200         """Normalize gene names by replacing special characters."""
 | 
| 
 | 
   201         return gene_name.replace("-", "_").replace(":", "_")
 | 
| 
 | 
   202     
 | 
| 
 | 
   203     def _normalize_rule(self, rule):
 | 
| 
 | 
   204         """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names."""
 | 
| 
 | 
   205         rule = rule.replace("OR", "or").replace("AND", "and")
 | 
| 
 | 
   206         rule = rule.replace("(", "( ").replace(")", " )")
 | 
| 
 | 
   207         # Normalize gene names in the rule
 | 
| 
 | 
   208         tokens = rule.split()
 | 
| 
 | 
   209         normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens]
 | 
| 
 | 
   210         return " ".join(normalized_tokens)
 | 
| 
 | 
   211     
 | 
| 
 | 
   212     def _init_from_anndata(self, adata, model):
 | 
| 
 | 
   213         """Initialize from AnnData and COBRApy model (original mode)."""
 | 
| 
 | 
   214         # Build the dictionary for the GPRs
 | 
| 
 | 
   215         df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions])
 | 
| 
 | 
   216         gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions]
 | 
| 
 | 
   217         df_reactions['rule'] = gene_rules
 | 
| 
 | 
   218         df_reactions = df_reactions.reset_index()
 | 
| 
 | 
   219         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
 | 
| 
 | 
   220         
 | 
| 
 | 
   221         self.dict_rule_reactions = df_reactions.to_dict()['index']
 | 
| 
 | 
   222 
 | 
| 
 | 
   223         # build useful structures for RAS computation
 | 
| 
 | 
   224         self.model = model
 | 
| 
 | 
   225         self.count_adata = adata.copy()
 | 
| 
 | 
   226         
 | 
| 
 | 
   227         # Normalize gene names in both model and dataset
 | 
| 
 | 
   228         model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes]
 | 
| 
 | 
   229         dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index]
 | 
| 
 | 
   230         self.genes = pd.Index(dataset_genes).intersection(model_genes)
 | 
| 
 | 
   231         
 | 
| 
 | 
   232         if len(self.genes) == 0:
 | 
| 
 | 
   233             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
 | 
| 
 | 
   234         
 | 
| 
 | 
   235         self.cell_ids = list(self.count_adata.obs.index.values)
 | 
| 
 | 
   236         # Get expression data with normalized gene names
 | 
| 
 | 
   237         self.count_df_filtered = self.count_adata.to_df().T
 | 
| 
 | 
   238         self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index]
 | 
| 
 | 
   239         self.count_df_filtered = self.count_df_filtered.loc[self.genes]
 | 
| 
 | 
   240     
 | 
| 
 | 
   241     def _init_from_dataframe(self, dataset, gene_rules, rules_total_string):
 | 
| 
 | 
   242         """Initialize from DataFrame and rules dict (ras_generator mode)."""
 | 
| 
 | 
   243         reactions = list(gene_rules.keys())
 | 
| 
 | 
   244         
 | 
| 
 | 
   245         # Build the dictionary for the GPRs
 | 
| 
 | 
   246         df_reactions = pd.DataFrame(index=reactions)
 | 
| 
 | 
   247         gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions]
 | 
| 
 | 
   248         df_reactions['rule'] = gene_rules_list
 | 
| 
 | 
   249         df_reactions = df_reactions.reset_index()
 | 
| 
 | 
   250         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
 | 
| 
 | 
   251 
 | 
| 
 | 
   252         self.dict_rule_reactions = df_reactions.to_dict()['index']
 | 
| 
 | 
   253 
 | 
| 
 | 
   254         # build useful structures for RAS computation
 | 
| 
 | 
   255         self.model = None
 | 
| 
 | 
   256         self.count_adata = None
 | 
| 
 | 
   257         
 | 
| 
 | 
   258         # Normalize gene names in dataset
 | 
| 
 | 
   259         dataset_normalized = dataset.copy()
 | 
| 
 | 
   260         dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index]
 | 
| 
 | 
   261         
 | 
| 
 | 
   262         # Determine which genes are in both dataset and GPRs
 | 
| 
 | 
   263         if rules_total_string is not None:
 | 
| 
 | 
   264             rules_genes = [self._normalize_gene_name(g) for g in rules_total_string]
 | 
| 
 | 
   265             self.genes = dataset_normalized.index.intersection(rules_genes)
 | 
| 
 | 
   266         else:
 | 
| 
 | 
   267             # Extract all genes from rules
 | 
| 
 | 
   268             all_genes_in_rules = set()
 | 
| 
 | 
   269             for rule in gene_rules_list:
 | 
| 
 | 
   270                 tokens = rule.split()
 | 
| 
 | 
   271                 for token in tokens:
 | 
| 
 | 
   272                     if token not in self._logic_operators:
 | 
| 
 | 
   273                         all_genes_in_rules.add(token)
 | 
| 
 | 
   274             self.genes = dataset_normalized.index.intersection(all_genes_in_rules)
 | 
| 
 | 
   275         
 | 
| 
 | 
   276         if len(self.genes) == 0:
 | 
| 
 | 
   277             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
 | 
| 
 | 
   278         
 | 
| 
 | 
   279         self.cell_ids = list(dataset_normalized.columns)
 | 
| 
 | 
   280         self.count_df_filtered = dataset_normalized.loc[self.genes]
 | 
| 
 | 
   281  
 | 
| 
 | 
   282     def compute(self,
 | 
| 
 | 
   283                 or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean)
 | 
| 
 | 
   284                 and_expression=np.min,      # type of operation to do in case of an and expression(min, sum)
 | 
| 
531
 | 
   285                 drop_na_rows=False,          # if True remove the nan rows of the ras  matrix
 | 
| 
529
 | 
   286                 drop_duplicates=False,      # if true, remove duplicates rows
 | 
| 
 | 
   287                 ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A)
 | 
| 
 | 
   288                 print_progressbar=True,     # if True, print the progress bar
 | 
| 
 | 
   289                 add_count_metadata=True,    # if True add metadata of cells in the ras adata
 | 
| 
 | 
   290                 add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions)
 | 
| 
 | 
   291                 add_essential_reactions=False,
 | 
| 
 | 
   292                 add_essential_genes=False
 | 
| 
 | 
   293                 ):
 | 
| 
 | 
   294 
 | 
| 
 | 
   295         self.or_function = or_expression
 | 
| 
 | 
   296         self.and_function = and_expression
 | 
| 
 | 
   297         
 | 
| 
 | 
   298         ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan)
 | 
| 
 | 
   299         genes_not_mapped = set()  # Track genes not in dataset
 | 
| 
 | 
   300         
 | 
| 
 | 
   301         if print_progressbar:
 | 
| 
 | 
   302             pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start()
 | 
| 
 | 
   303         
 | 
| 
 | 
   304         # Process each unique GPR rule
 | 
| 
 | 
   305         for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()):
 | 
| 
 | 
   306             if len(rule) == 0:
 | 
| 
 | 
   307                 # Empty rule - keep as NaN
 | 
| 
 | 
   308                 pass
 | 
| 
 | 
   309             else:
 | 
| 
 | 
   310                 # Extract genes from rule
 | 
| 
 | 
   311                 rule_genes = [token for token in rule.split() if token not in self._logic_operators]
 | 
| 
 | 
   312                 rule_genes_unique = list(set(rule_genes))
 | 
| 
 | 
   313                 
 | 
| 
 | 
   314                 # Which genes are in the dataset?
 | 
| 
 | 
   315                 genes_present = [g for g in rule_genes_unique if g in self.genes]
 | 
| 
 | 
   316                 genes_missing = [g for g in rule_genes_unique if g not in self.genes]
 | 
| 
 | 
   317                 
 | 
| 
 | 
   318                 if genes_missing:
 | 
| 
 | 
   319                     genes_not_mapped.update(genes_missing)
 | 
| 
 | 
   320                 
 | 
| 
 | 
   321                 if len(genes_present) == 0:
 | 
| 
 | 
   322                     # No genes in dataset - keep as NaN
 | 
| 
 | 
   323                     pass
 | 
| 
 | 
   324                 elif len(genes_missing) > 0 and not ignore_nan:
 | 
| 
 | 
   325                     # Some genes missing and we don't ignore NaN - set to NaN
 | 
| 
 | 
   326                     pass
 | 
| 
 | 
   327                 else:
 | 
| 
 | 
   328                     # Evaluate the GPR expression using AST
 | 
| 
 | 
   329                     # For single gene, AST handles it fine: ast.parse("GENE_A") works
 | 
| 
531
 | 
   330                     # more genes in the formula
 | 
| 
 | 
   331                     check_only_and=("and" in rule and "or" not in rule) #only and
 | 
| 
 | 
   332                     check_only_or=("or" in rule and "and" not in rule)  #only or
 | 
| 
 | 
   333                     if check_only_and or check_only_or:
 | 
| 
 | 
   334                         #or/and sequence
 | 
| 
 | 
   335                         matrix = self.count_df_filtered.loc[genes_present].values
 | 
| 
 | 
   336                         #compute for all cells
 | 
| 
 | 
   337                         if check_only_and: 
 | 
| 
 | 
   338                             ras_df[ind] = self.and_function(matrix, axis=0)
 | 
| 
 | 
   339                         else:
 | 
| 
 | 
   340                             ras_df[ind] = self.or_function(matrix, axis=0)
 | 
| 
 | 
   341                     else:
 | 
| 
 | 
   342                         # complex expression (e.g. A or (B and C))
 | 
| 
 | 
   343                         data = self.count_df_filtered.loc[genes_present]  # dataframe of genes in the GPRs
 | 
| 
529
 | 
   344                         tree = ast.parse(rule, mode="eval").body
 | 
| 
531
 | 
   345                         values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
 | 
| 
 | 
   346                         for j, values in enumerate(values_by_cell):
 | 
| 
 | 
   347                             ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan)
 | 
| 
 | 
   348 
 | 
| 
529
 | 
   349             if print_progressbar:
 | 
| 
 | 
   350                 pbar.update(ind + 1)
 | 
| 
 | 
   351         
 | 
| 
 | 
   352         if print_progressbar:
 | 
| 
 | 
   353             pbar.finish()
 | 
| 
 | 
   354         
 | 
| 
 | 
   355         # Store genes not mapped for later use
 | 
| 
 | 
   356         self.genes_not_mapped = sorted(genes_not_mapped)
 | 
| 
 | 
   357         
 | 
| 
 | 
   358         # create the dataframe of ras (rules x samples)
 | 
| 
 | 
   359         ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids)
 | 
| 
530
 | 
   360         ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()]
 | 
| 
529
 | 
   361         
 | 
| 
 | 
   362         reactions_common = pd.DataFrame()
 | 
| 
530
 | 
   363         reactions_common["Reactions"] = ras_df['Reactions']
 | 
| 
 | 
   364         reactions_common["proof2"] = ras_df['Reactions']
 | 
| 
 | 
   365         reactions_common = reactions_common.explode('Reactions')
 | 
| 
 | 
   366         reactions_common = reactions_common.set_index("Reactions")
 | 
| 
529
 | 
   367 
 | 
| 
530
 | 
   368         ras_df = ras_df.explode("Reactions")
 | 
| 
 | 
   369         ras_df = ras_df.set_index("Reactions")
 | 
| 
529
 | 
   370 
 | 
| 
 | 
   371         if drop_na_rows:
 | 
| 
 | 
   372             ras_df = ras_df.dropna(how="all")
 | 
| 
 | 
   373             
 | 
| 
 | 
   374         if drop_duplicates:
 | 
| 
 | 
   375             ras_df = ras_df.drop_duplicates()
 | 
| 
 | 
   376         
 | 
| 
 | 
   377         # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData
 | 
| 
 | 
   378         if self.count_adata is None:
 | 
| 
 | 
   379             return ras_df, self.genes_not_mapped
 | 
| 
 | 
   380         
 | 
| 
 | 
   381         # Original AnnData mode: create AnnData structure for RAS
 | 
| 
 | 
   382         ras_adata = AnnData(ras_df.T)
 | 
| 
 | 
   383 
 | 
| 
 | 
   384         #add metadata
 | 
| 
 | 
   385         if add_count_metadata:
 | 
| 
 | 
   386             ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index]
 | 
| 
 | 
   387             ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x))
 | 
| 
 | 
   388             for el in self.count_adata.obs.columns:
 | 
| 
 | 
   389                 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el]
 | 
| 
 | 
   390 
 | 
| 
 | 
   391         if add_met_metadata:
 | 
| 
 | 
   392             if self.model is not None and len(self.model.compartments)>0:
 | 
| 
 | 
   393                   ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index]  
 | 
| 
 | 
   394                   ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x))
 | 
| 
 | 
   395             
 | 
| 
 | 
   396             if self.model is not None:
 | 
| 
 | 
   397                 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index]
 | 
| 
 | 
   398 
 | 
| 
 | 
   399         if add_essential_reactions:
 | 
| 
 | 
   400             if self.model is not None:
 | 
| 
 | 
   401                 essential_reactions=find_essential_reactions(self.model)
 | 
| 
 | 
   402                 essential_reactions=[el.id for el in essential_reactions]            
 | 
| 
 | 
   403                 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index]
 | 
| 
 | 
   404         
 | 
| 
 | 
   405         if add_essential_genes:
 | 
| 
 | 
   406             if self.model is not None:
 | 
| 
 | 
   407                 essential_genes=find_essential_genes(self.model)
 | 
| 
 | 
   408                 essential_genes=[el.id for el in essential_genes]
 | 
| 
 | 
   409                 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]]
 | 
| 
 | 
   410         
 | 
| 
 | 
   411         return ras_adata
 | 
| 
 | 
   412 
 | 
| 
 | 
   413     def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan):
 | 
| 
 | 
   414         """
 | 
| 
 | 
   415         Evaluate a boolean expression using AST (Abstract Syntax Tree).
 | 
| 
 | 
   416         Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)).
 | 
| 
 | 
   417         
 | 
| 
 | 
   418         Args:
 | 
| 
 | 
   419             node: AST node to evaluate
 | 
| 
 | 
   420             values: Dictionary mapping gene names to their expression values
 | 
| 
 | 
   421             or_function: Function to apply for OR operations
 | 
| 
 | 
   422             and_function: Function to apply for AND operations
 | 
| 
 | 
   423             ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A)
 | 
| 
 | 
   424             
 | 
| 
 | 
   425         Returns:
 | 
| 
 | 
   426             Evaluated expression result (float or np.nan)
 | 
| 
 | 
   427         """
 | 
| 
 | 
   428         if isinstance(node, ast.BoolOp):
 | 
| 
 | 
   429             # Boolean operation (and/or)
 | 
| 
 | 
   430             vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values]
 | 
| 
 | 
   431             
 | 
| 
 | 
   432             if ignore_nan:
 | 
| 
 | 
   433                 # Filter out None/NaN values
 | 
| 
 | 
   434                 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))]
 | 
| 
 | 
   435             
 | 
| 
 | 
   436             if not vals:
 | 
| 
 | 
   437                 return np.nan
 | 
| 
 | 
   438             
 | 
| 
 | 
   439             if isinstance(node.op, ast.Or):
 | 
| 
 | 
   440                 return or_function(vals)
 | 
| 
 | 
   441             elif isinstance(node.op, ast.And):
 | 
| 
 | 
   442                 return and_function(vals)
 | 
| 
 | 
   443 
 | 
| 
 | 
   444         elif isinstance(node, ast.Name):
 | 
| 
 | 
   445             # Variable (gene name)
 | 
| 
 | 
   446             return values.get(node.id, None)
 | 
| 
 | 
   447         elif isinstance(node, ast.Constant):
 | 
| 
 | 
   448             # Constant (shouldn't happen in GPRs, but handle it)
 | 
| 
 | 
   449             return values.get(str(node.value), None)
 | 
| 
 | 
   450         else:
 | 
| 
 | 
   451             raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}")
 | 
| 
505
 | 
   452 
 | 
| 
 | 
   453 
 | 
| 
529
 | 
   454 # ============================================================================
 | 
| 
 | 
   455 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY
 | 
| 
 | 
   456 # ============================================================================
 | 
| 
505
 | 
   457 
 | 
| 
529
 | 
   458 def computeRAS(
 | 
| 
 | 
   459     dataset, 
 | 
| 
 | 
   460     gene_rules, 
 | 
| 
 | 
   461     rules_total_string,
 | 
| 
 | 
   462     or_function=np.sum,
 | 
| 
 | 
   463     and_function=np.min,
 | 
| 
 | 
   464     ignore_nan=True
 | 
| 
 | 
   465 ):
 | 
| 
 | 
   466     """
 | 
| 
 | 
   467     Compute RAS from tabular data and GPR rules (ras_generator.py compatible).
 | 
| 
 | 
   468     
 | 
| 
 | 
   469     This is a standalone function that wraps the RAS_computation class
 | 
| 
 | 
   470     to provide the same interface as ras_generator.py.
 | 
| 
 | 
   471     
 | 
| 
 | 
   472     Args:
 | 
| 
 | 
   473         dataset: pandas DataFrame with gene expression (genes × samples)
 | 
| 
 | 
   474         gene_rules: dict mapping reaction IDs to GPR strings
 | 
| 
 | 
   475         rules_total_string: list of all gene names in GPRs
 | 
| 
 | 
   476         or_function: function for OR operations (default: np.sum)
 | 
| 
 | 
   477         and_function: function for AND operations (default: np.min)
 | 
| 
 | 
   478         ignore_nan: if True, ignore NaN in GPR evaluation (default: True)
 | 
| 
505
 | 
   479     
 | 
| 
529
 | 
   480     Returns:
 | 
| 
 | 
   481         tuple: (ras_df, genes_not_mapped)
 | 
| 
 | 
   482             - ras_df: DataFrame with RAS values (reactions × samples)
 | 
| 
 | 
   483             - genes_not_mapped: list of genes in GPRs not found in dataset
 | 
| 
 | 
   484     """
 | 
| 
 | 
   485     # Create RAS computation object in DataFrame mode
 | 
| 
 | 
   486     ras_obj = RAS_computation(
 | 
| 
 | 
   487         dataset=dataset,
 | 
| 
 | 
   488         gene_rules=gene_rules,
 | 
| 
 | 
   489         rules_total_string=rules_total_string
 | 
| 
 | 
   490     )
 | 
| 
505
 | 
   491     
 | 
| 
529
 | 
   492     # Compute RAS
 | 
| 
 | 
   493     result = ras_obj.compute(
 | 
| 
 | 
   494         or_expression=or_function,
 | 
| 
 | 
   495         and_expression=and_function,
 | 
| 
 | 
   496         ignore_nan=ignore_nan,
 | 
| 
 | 
   497         print_progressbar=False,  # No progress bar for ras_generator
 | 
| 
 | 
   498         add_count_metadata=False,  # No metadata in DataFrame mode
 | 
| 
 | 
   499         add_met_metadata=False,
 | 
| 
 | 
   500         add_essential_reactions=False,
 | 
| 
 | 
   501         add_essential_genes=False
 | 
| 
 | 
   502     )
 | 
| 
 | 
   503     
 | 
| 
 | 
   504     # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode
 | 
| 
 | 
   505     return result
 | 
| 
505
 | 
   506 
 | 
| 
147
 | 
   507 def main(args:List[str] = None) -> None:
 | 
| 
93
 | 
   508     """
 | 
| 
 | 
   509     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   510     
 | 
| 
 | 
   511     Returns:
 | 
| 
 | 
   512         None
 | 
| 
 | 
   513     """
 | 
| 
 | 
   514     # get args from frontend (related xml)
 | 
| 
 | 
   515     global ARGS
 | 
| 
147
 | 
   516     ARGS = process_args(args)
 | 
| 
309
 | 
   517 
 | 
| 
505
 | 
   518     # read dataset and remove versioning from gene names
 | 
| 
93
 | 
   519     dataset = read_dataset(ARGS.input, "dataset")
 | 
| 
510
 | 
   520     orig_gene_list=dataset.index.copy()
 | 
| 
 | 
   521     dataset.index =  [str(el.split(".")[0]) for el in dataset.index]  
 | 
| 
 | 
   522 
 | 
| 
505
 | 
   523     #load GPR rules
 | 
| 
489
 | 
   524     rules = load_custom_rules()
 | 
| 
505
 | 
   525     
 | 
| 
 | 
   526     #create a list of all the gpr
 | 
| 
 | 
   527     rules_total_string=""
 | 
| 
 | 
   528     for id,rule in rules.items():
 | 
| 
 | 
   529         rules_total_string+=rule.replace("(","").replace(")","") + " "
 | 
| 
 | 
   530     rules_total_string=list(set(rules_total_string.split(" ")))
 | 
| 
93
 | 
   531 
 | 
| 
512
 | 
   532     if any(dataset.index.duplicated(keep=False)):
 | 
| 
 | 
   533         genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)]
 | 
| 
 | 
   534         genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string]
 | 
| 
513
 | 
   535 
 | 
| 
512
 | 
   536         if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset
 | 
| 
 | 
   537             list_str=", ".join(genes_duplicates_in_model)
 | 
| 
513
 | 
   538             list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str
 | 
| 
 | 
   539             raise ValueError(list_genes)       
 | 
| 
 | 
   540         else:
 | 
| 
 | 
   541             list_str=", ".join(genes_duplicates)
 | 
| 
 | 
   542             list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str           
 | 
| 
 | 
   543             utils.logWarning(list_genes,ARGS.out_log)  
 | 
| 
512
 | 
   544 
 | 
| 
505
 | 
   545     #check if nan value must be ignored in the GPR 
 | 
| 
 | 
   546     if ARGS.none:
 | 
| 
 | 
   547     #    #e.g. (A or nan --> A)
 | 
| 
 | 
   548         ignore_nan = True
 | 
| 
 | 
   549     else:
 | 
| 
 | 
   550         #e.g. (A or nan --> nan)
 | 
| 
 | 
   551         ignore_nan = False
 | 
| 
 | 
   552     
 | 
| 
 | 
   553     #compure ras
 | 
| 
510
 | 
   554     ras_df,genes_not_mapped=computeRAS(dataset,rules,
 | 
| 
505
 | 
   555                     rules_total_string,
 | 
| 
 | 
   556                     or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
 | 
| 
 | 
   557                     and_function=np.min,
 | 
| 
 | 
   558                     ignore_nan=ignore_nan)
 | 
| 
 | 
   559     
 | 
| 
 | 
   560     #save to csv and replace nan with None
 | 
| 
510
 | 
   561     ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t')
 | 
| 
381
 | 
   562 
 | 
| 
510
 | 
   563     #report genes not present in the data
 | 
| 
 | 
   564     if len(genes_not_mapped)>0: 
 | 
| 
 | 
   565         genes_not_mapped_str=", ".join(genes_not_mapped)
 | 
| 
 | 
   566         utils.logWarning(
 | 
| 
513
 | 
   567         f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str,
 | 
| 
510
 | 
   568         ARGS.out_log)  
 | 
| 
 | 
   569  
 | 
| 
489
 | 
   570     print("Execution succeeded")
 | 
| 
93
 | 
   571 
 | 
| 
 | 
   572 ###############################################################################
 | 
| 
 | 
   573 if __name__ == "__main__":
 | 
| 
505
 | 
   574     main() |