| 
489
 | 
     1 """
 | 
| 
 | 
     2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
 | 
| 
 | 
     5 computes RAS per reaction for each sample/cell line, and writes a tabular output.
 | 
| 
 | 
     6 """
 | 
| 
93
 | 
     7 from __future__ import division
 | 
| 
 | 
     8 import sys
 | 
| 
 | 
     9 import argparse
 | 
| 
 | 
    10 import collections
 | 
| 
 | 
    11 import pandas as pd
 | 
| 
 | 
    12 import pickle as pk
 | 
| 
 | 
    13 import utils.general_utils as utils
 | 
| 
 | 
    14 import utils.rule_parsing as ruleUtils
 | 
| 
 | 
    15 from typing import Union, Optional, List, Dict, Tuple, TypeVar
 | 
| 
 | 
    16 
 | 
| 
 | 
    17 ERRORS = []
 | 
| 
 | 
    18 ########################## argparse ##########################################
 | 
| 
 | 
    19 ARGS :argparse.Namespace
 | 
| 
147
 | 
    20 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
93
 | 
    21     """
 | 
| 
 | 
    22     Processes command-line arguments.
 | 
| 
 | 
    23 
 | 
| 
 | 
    24     Args:
 | 
| 
 | 
    25         args (list): List of command-line arguments.
 | 
| 
 | 
    26 
 | 
| 
 | 
    27     Returns:
 | 
| 
 | 
    28         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    29     """
 | 
| 
 | 
    30     parser = argparse.ArgumentParser(
 | 
| 
 | 
    31         usage = '%(prog)s [options]',
 | 
| 
 | 
    32         description = "process some value's genes to create a comparison's map.")
 | 
| 
 | 
    33     
 | 
| 
489
 | 
    34     parser.add_argument("-rl", "--model_upload", type = str,
 | 
| 
 | 
    35         help = "path to input file containing the rules")
 | 
| 
93
 | 
    36 
 | 
| 
489
 | 
    37     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
 | 
| 
 | 
    38     # Galaxy converts files into .dat, this helps infer the original extension when needed.
 | 
| 
93
 | 
    39     
 | 
| 
 | 
    40     parser.add_argument(
 | 
| 
 | 
    41         '-n', '--none',
 | 
| 
 | 
    42         type = utils.Bool("none"), default = True,
 | 
| 
 | 
    43         help = 'compute Nan values')
 | 
| 
 | 
    44     
 | 
| 
 | 
    45     parser.add_argument(
 | 
| 
 | 
    46         '-td', '--tool_dir',
 | 
| 
 | 
    47         type = str,
 | 
| 
 | 
    48         required = True, help = 'your tool directory')
 | 
| 
 | 
    49     
 | 
| 
 | 
    50     parser.add_argument(
 | 
| 
 | 
    51         '-ol', '--out_log',
 | 
| 
 | 
    52         type = str,
 | 
| 
 | 
    53         help = "Output log")    
 | 
| 
 | 
    54     
 | 
| 
 | 
    55     parser.add_argument(
 | 
| 
489
 | 
    56         '-in', '--input',
 | 
| 
93
 | 
    57         type = str,
 | 
| 
 | 
    58         help = 'input dataset')
 | 
| 
 | 
    59     
 | 
| 
 | 
    60     parser.add_argument(
 | 
| 
 | 
    61         '-ra', '--ras_output',
 | 
| 
 | 
    62         type = str,
 | 
| 
 | 
    63         required = True, help = 'ras output')
 | 
| 
147
 | 
    64 
 | 
| 
93
 | 
    65     
 | 
| 
147
 | 
    66     return parser.parse_args(args)
 | 
| 
93
 | 
    67 
 | 
| 
 | 
    68 ############################ dataset input ####################################
 | 
| 
 | 
    69 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
    70     """
 | 
| 
 | 
    71     Read a dataset from a CSV file and return it as a pandas DataFrame.
 | 
| 
 | 
    72 
 | 
| 
 | 
    73     Args:
 | 
| 
 | 
    74         data (str): Path to the CSV file containing the dataset.
 | 
| 
 | 
    75         name (str): Name of the dataset, used in error messages.
 | 
| 
 | 
    76 
 | 
| 
 | 
    77     Returns:
 | 
| 
 | 
    78         pandas.DataFrame: DataFrame containing the dataset.
 | 
| 
 | 
    79 
 | 
| 
 | 
    80     Raises:
 | 
| 
 | 
    81         pd.errors.EmptyDataError: If the CSV file is empty.
 | 
| 
 | 
    82         sys.exit: If the CSV file has the wrong format, the execution is aborted.
 | 
| 
 | 
    83     """
 | 
| 
 | 
    84     try:
 | 
| 
 | 
    85         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
 | 
| 
 | 
    86     except pd.errors.EmptyDataError:
 | 
| 
 | 
    87         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
    88     if len(dataset.columns) < 2:
 | 
| 
 | 
    89         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
    90     return dataset
 | 
| 
 | 
    91 
 | 
| 
 | 
    92 ############################ load id e rules ##################################
 | 
| 
 | 
    93 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
 | 
| 
 | 
    94     """
 | 
| 
 | 
    95     Load IDs and rules from a dictionary of reactions.
 | 
| 
 | 
    96 
 | 
| 
 | 
    97     Args:
 | 
| 
 | 
    98         reactions (dict): A dictionary where keys are IDs and values are rules.
 | 
| 
 | 
    99 
 | 
| 
 | 
   100     Returns:
 | 
| 
 | 
   101         tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
 | 
| 
 | 
   102     """
 | 
| 
 | 
   103     ids, rules = [], []
 | 
| 
 | 
   104     for key, value in reactions.items():
 | 
| 
 | 
   105             ids.append(key)
 | 
| 
 | 
   106             rules.append(value)
 | 
| 
 | 
   107     return (ids, rules)
 | 
| 
 | 
   108 
 | 
| 
 | 
   109 
 | 
| 
 | 
   110 ############################ gene #############################################
 | 
| 
 | 
   111 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
 | 
| 
 | 
   112     """
 | 
| 
 | 
   113     Process gene data to ensure correct formatting and handle duplicates.
 | 
| 
 | 
   114 
 | 
| 
 | 
   115     Args:
 | 
| 
 | 
   116         gene (DataFrame): DataFrame containing gene data.
 | 
| 
 | 
   117         type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
 | 
| 
 | 
   118         name (str): Name of the dataset.
 | 
| 
 | 
   119         gene_custom (dict or None): Custom gene data dictionary if provided.
 | 
| 
 | 
   120 
 | 
| 
 | 
   121     Returns:
 | 
| 
 | 
   122         dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
 | 
| 
 | 
   123     """
 | 
| 
309
 | 
   124  
 | 
| 
93
 | 
   125     for i in range(len(gene)):
 | 
| 
 | 
   126         tmp = gene.iloc[i, 0]
 | 
| 
 | 
   127         gene.iloc[i, 0] = tmp.strip().split('.')[0]
 | 
| 
 | 
   128 
 | 
| 
 | 
   129     gene_dup = [item for item, count in 
 | 
| 
 | 
   130                collections.Counter(gene[gene.columns[0]]).items() if count > 1]
 | 
| 
 | 
   131     pat_dup = [item for item, count in 
 | 
| 
 | 
   132                collections.Counter(list(gene.columns)).items() if count > 1]
 | 
| 
260
 | 
   133     
 | 
| 
 | 
   134     gene_in_rule = None
 | 
| 
259
 | 
   135 
 | 
| 
93
 | 
   136     if gene_dup:
 | 
| 
 | 
   137         if gene_custom == None:
 | 
| 
264
 | 
   138 
 | 
| 
309
 | 
   139             if str(ARGS.rules_selector) == 'HMRcore':
 | 
| 
 | 
   140                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
 | 
| 
93
 | 
   141             
 | 
| 
309
 | 
   142             elif str(ARGS.rules_selector) == 'Recon':
 | 
| 
 | 
   143                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
 | 
| 
93
 | 
   144             
 | 
| 
309
 | 
   145             elif str(ARGS.rules_selector) == 'ENGRO2':
 | 
| 
 | 
   146                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
 | 
| 
263
 | 
   147 
 | 
| 
309
 | 
   148             utils.logWarning(f"{ARGS.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log)
 | 
| 
259
 | 
   149 
 | 
| 
93
 | 
   150             gene_in_rule = gene_in_rule.get(type_gene)
 | 
| 
 | 
   151         
 | 
| 
 | 
   152         else:
 | 
| 
 | 
   153             gene_in_rule = gene_custom
 | 
| 
260
 | 
   154 
 | 
| 
93
 | 
   155         tmp = []
 | 
| 
 | 
   156         for i in gene_dup:
 | 
| 
 | 
   157             if gene_in_rule.get(i) == 'ok':
 | 
| 
 | 
   158                 tmp.append(i)
 | 
| 
 | 
   159         if tmp:
 | 
| 
 | 
   160             sys.exit('Execution aborted because gene ID '
 | 
| 
 | 
   161                      +str(tmp)+' in '+name+' is duplicated\n')
 | 
| 
 | 
   162     
 | 
| 
 | 
   163     if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
 | 
| 
 | 
   164     return (gene.set_index(gene.columns[0])).to_dict()
 | 
| 
 | 
   165 
 | 
| 
 | 
   166 ############################ resolve ##########################################
 | 
| 
 | 
   167 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
 | 
| 
 | 
   168     """
 | 
| 
489
 | 
   169     Replace gene identifiers in a parsed rule expression with values from a dict.
 | 
| 
93
 | 
   170 
 | 
| 
 | 
   171     Args:
 | 
| 
489
 | 
   172         l: Parsed rule as a nested list structure (strings, lists, and operators).
 | 
| 
 | 
   173         d: Dict mapping gene IDs to numeric values.
 | 
| 
93
 | 
   174 
 | 
| 
 | 
   175     Returns:
 | 
| 
489
 | 
   176         tuple: (new_expression, not_found_genes)
 | 
| 
93
 | 
   177     """
 | 
| 
 | 
   178     tmp = []
 | 
| 
 | 
   179     err = []
 | 
| 
 | 
   180     while l:
 | 
| 
 | 
   181         if isinstance(l[0], list):
 | 
| 
 | 
   182             tmp_rules, tmp_err = replace_gene_value(l[0], d)
 | 
| 
 | 
   183             tmp.append(tmp_rules)
 | 
| 
 | 
   184             err.extend(tmp_err)
 | 
| 
 | 
   185         else:
 | 
| 
 | 
   186             value = replace_gene(l[0], d)
 | 
| 
 | 
   187             tmp.append(value)
 | 
| 
 | 
   188             if value == None:
 | 
| 
 | 
   189                 err.append(l[0])
 | 
| 
 | 
   190         l = l[1:]
 | 
| 
 | 
   191     return (tmp, err)
 | 
| 
 | 
   192 
 | 
| 
489
 | 
   193 def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]:
 | 
| 
93
 | 
   194     """
 | 
| 
 | 
   195     Replace a single gene identifier with its corresponding value from a dictionary.
 | 
| 
 | 
   196 
 | 
| 
 | 
   197     Args:
 | 
| 
 | 
   198         l (str): Gene identifier to replace.
 | 
| 
489
 | 
   199         d (dict): Dict mapping gene IDs to numeric values.
 | 
| 
93
 | 
   200 
 | 
| 
 | 
   201     Returns:
 | 
| 
489
 | 
   202         float/int/None: Corresponding value from the dictionary if found, None otherwise.
 | 
| 
93
 | 
   203 
 | 
| 
 | 
   204     Raises:
 | 
| 
 | 
   205         sys.exit: If the value associated with the gene identifier is not valid.
 | 
| 
 | 
   206     """
 | 
| 
 | 
   207     if l =='and' or l == 'or':
 | 
| 
 | 
   208         return l
 | 
| 
 | 
   209     else:
 | 
| 
 | 
   210         value = d.get(l, None)
 | 
| 
 | 
   211         if not(value == None or isinstance(value, (int, float))):
 | 
| 
 | 
   212             sys.exit('Execution aborted: ' + value + ' value not valid\n')
 | 
| 
 | 
   213         return value
 | 
| 
 | 
   214 
 | 
| 
 | 
   215 T = TypeVar("T", bound = Optional[Union[int, float]])
 | 
| 
 | 
   216 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
 | 
| 
 | 
   217     """
 | 
| 
 | 
   218     Compute the RAS value between two value and an operator ('and' or 'or').
 | 
| 
 | 
   219 
 | 
| 
 | 
   220     Args:
 | 
| 
 | 
   221         val1(Optional(Union[float, int])): First value.
 | 
| 
 | 
   222         op (str): Operator ('and' or 'or').
 | 
| 
 | 
   223         val2(Optional(Union[float, int])): Second value.
 | 
| 
 | 
   224         cn (bool): Control boolean value.
 | 
| 
 | 
   225 
 | 
| 
 | 
   226     Returns:
 | 
| 
 | 
   227         Optional(Union[float, int]): Result of the computation.
 | 
| 
 | 
   228     """
 | 
| 
 | 
   229     if val1 != None and val2 != None:
 | 
| 
 | 
   230         if op == 'and':
 | 
| 
 | 
   231             return min(val1, val2)
 | 
| 
 | 
   232         else:
 | 
| 
 | 
   233             return val1 + val2
 | 
| 
 | 
   234     elif op == 'and':
 | 
| 
 | 
   235         if cn is True:
 | 
| 
 | 
   236             if val1 != None:
 | 
| 
 | 
   237                 return val1
 | 
| 
 | 
   238             elif val2 != None:
 | 
| 
 | 
   239                 return val2
 | 
| 
 | 
   240             else:
 | 
| 
 | 
   241                 return None
 | 
| 
 | 
   242         else:
 | 
| 
 | 
   243             return None
 | 
| 
 | 
   244     else:
 | 
| 
 | 
   245         if val1 != None:
 | 
| 
 | 
   246             return val1
 | 
| 
 | 
   247         elif val2 != None:
 | 
| 
 | 
   248             return val2
 | 
| 
 | 
   249         else:
 | 
| 
 | 
   250             return None
 | 
| 
 | 
   251 
 | 
| 
 | 
   252 # ris should be Literal[None] but Literal is not supported in Python 3.7
 | 
| 
 | 
   253 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
 | 
| 
 | 
   254     """
 | 
| 
 | 
   255     Control the format of the expression.
 | 
| 
 | 
   256 
 | 
| 
 | 
   257     Args:
 | 
| 
 | 
   258         ris: Intermediate result.
 | 
| 
 | 
   259         l (list): Expression to control.
 | 
| 
 | 
   260         cn (bool): Control boolean value.
 | 
| 
 | 
   261 
 | 
| 
 | 
   262     Returns:
 | 
| 
 | 
   263         Union[Literal[False], int, float]: Result of the control.
 | 
| 
 | 
   264     """
 | 
| 
 | 
   265     if len(l) == 1:
 | 
| 
 | 
   266         if isinstance(l[0], (float, int)) or l[0] == None:
 | 
| 
 | 
   267             return l[0]
 | 
| 
 | 
   268         elif isinstance(l[0], list):
 | 
| 
 | 
   269             return control(None, l[0], cn)
 | 
| 
 | 
   270         else:
 | 
| 
 | 
   271             return False
 | 
| 
 | 
   272     elif len(l) > 2:
 | 
| 
 | 
   273         return control_list(ris, l, cn)
 | 
| 
 | 
   274     else:
 | 
| 
 | 
   275         return False
 | 
| 
 | 
   276 
 | 
| 
 | 
   277 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
 | 
| 
 | 
   278     """
 | 
| 
 | 
   279     Control the format of a list of expressions.
 | 
| 
 | 
   280 
 | 
| 
 | 
   281     Args:
 | 
| 
 | 
   282         ris: Intermediate result.
 | 
| 
 | 
   283         l (list): List of expressions to control.
 | 
| 
 | 
   284         cn (bool): Control boolean value.
 | 
| 
 | 
   285 
 | 
| 
 | 
   286     Returns:
 | 
| 
 | 
   287         Optional[Literal[False]]: Result of the control.
 | 
| 
 | 
   288     """
 | 
| 
 | 
   289     while l:
 | 
| 
 | 
   290         if len(l) == 1:
 | 
| 
 | 
   291             return False
 | 
| 
 | 
   292         elif (isinstance(l[0], (float, int)) or
 | 
| 
 | 
   293               l[0] == None) and l[1] in ['and', 'or']:
 | 
| 
 | 
   294             if isinstance(l[2], (float, int)) or l[2] == None:
 | 
| 
 | 
   295                 ris = computes(l[0], l[1], l[2], cn)            
 | 
| 
 | 
   296             elif isinstance(l[2], list):
 | 
| 
 | 
   297                 tmp = control(None, l[2], cn)
 | 
| 
 | 
   298                 if tmp is False:
 | 
| 
 | 
   299                     return False
 | 
| 
 | 
   300                 else:
 | 
| 
 | 
   301                     ris = computes(l[0], l[1], tmp, cn)
 | 
| 
 | 
   302             else:
 | 
| 
 | 
   303                 return False
 | 
| 
 | 
   304             l = l[3:]
 | 
| 
 | 
   305         elif l[0] in ['and', 'or']:
 | 
| 
 | 
   306             if isinstance(l[1], (float, int)) or l[1] == None:
 | 
| 
 | 
   307                 ris = computes(ris, l[0], l[1], cn)
 | 
| 
 | 
   308             elif isinstance(l[1], list):
 | 
| 
 | 
   309                 tmp = control(None,l[1], cn)
 | 
| 
 | 
   310                 if tmp is False:
 | 
| 
 | 
   311                     return False
 | 
| 
 | 
   312                 else:
 | 
| 
 | 
   313                     ris = computes(ris, l[0], tmp, cn)
 | 
| 
 | 
   314             else:
 | 
| 
 | 
   315                 return False
 | 
| 
 | 
   316             l = l[2:]
 | 
| 
 | 
   317         elif isinstance(l[0], list) and l[1] in ['and', 'or']:
 | 
| 
 | 
   318             if isinstance(l[2], (float, int)) or l[2] == None:
 | 
| 
 | 
   319                 tmp = control(None, l[0], cn)
 | 
| 
 | 
   320                 if tmp is False:
 | 
| 
 | 
   321                     return False
 | 
| 
 | 
   322                 else:
 | 
| 
 | 
   323                     ris = computes(tmp, l[1], l[2], cn)
 | 
| 
 | 
   324             elif isinstance(l[2], list):
 | 
| 
 | 
   325                 tmp = control(None, l[0], cn)
 | 
| 
 | 
   326                 tmp2 = control(None, l[2], cn)
 | 
| 
 | 
   327                 if tmp is False or tmp2 is False:
 | 
| 
 | 
   328                     return False
 | 
| 
 | 
   329                 else:
 | 
| 
 | 
   330                     ris = computes(tmp, l[1], tmp2, cn)
 | 
| 
 | 
   331             else:
 | 
| 
 | 
   332                 return False
 | 
| 
 | 
   333             l = l[3:]
 | 
| 
 | 
   334         else:
 | 
| 
 | 
   335             return False
 | 
| 
 | 
   336     return ris
 | 
| 
 | 
   337 
 | 
| 
 | 
   338 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
 | 
| 
 | 
   339 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
 | 
| 
 | 
   340     """
 | 
| 
 | 
   341     Resolve rules using gene data to compute scores for each rule.
 | 
| 
 | 
   342 
 | 
| 
 | 
   343     Args:
 | 
| 
 | 
   344         genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
 | 
| 
 | 
   345         rules (list): List of rules to resolve.
 | 
| 
 | 
   346         ids (list): List of IDs corresponding to the rules.
 | 
| 
 | 
   347         resolve_none (bool): Flag indicating whether to resolve None values in the rules.
 | 
| 
 | 
   348         name (str): Name of the dataset.
 | 
| 
 | 
   349 
 | 
| 
 | 
   350     Returns:
 | 
| 
 | 
   351         tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
 | 
| 
 | 
   352     """
 | 
| 
 | 
   353     resolve_rules = {}
 | 
| 
 | 
   354     not_found = []
 | 
| 
 | 
   355     flag = False
 | 
| 
 | 
   356     for key, value in genes.items():
 | 
| 
 | 
   357         tmp_resolve = []
 | 
| 
 | 
   358         for i in range(len(rules)):
 | 
| 
 | 
   359             tmp = rules[i]
 | 
| 
 | 
   360             if tmp:
 | 
| 
 | 
   361                 tmp, err = replace_gene_value(tmp, value)
 | 
| 
 | 
   362                 if err:
 | 
| 
 | 
   363                     not_found.extend(err)
 | 
| 
 | 
   364                 ris = control(None, tmp, resolve_none)
 | 
| 
 | 
   365                 if ris is False or ris == None:
 | 
| 
 | 
   366                     tmp_resolve.append(None)
 | 
| 
 | 
   367                 else:
 | 
| 
 | 
   368                     tmp_resolve.append(ris)
 | 
| 
 | 
   369                     flag = True
 | 
| 
 | 
   370             else:
 | 
| 
 | 
   371                 tmp_resolve.append(None)    
 | 
| 
 | 
   372         resolve_rules[key] = tmp_resolve
 | 
| 
 | 
   373     
 | 
| 
 | 
   374     if flag is False:
 | 
| 
 | 
   375         utils.logWarning(
 | 
| 
 | 
   376             f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
 | 
| 
 | 
   377             ARGS.out_log)
 | 
| 
 | 
   378         
 | 
| 
 | 
   379         return (None, None)
 | 
| 
 | 
   380     
 | 
| 
 | 
   381     return (resolve_rules, list(set(not_found)))
 | 
| 
 | 
   382 ############################ create_ras #######################################
 | 
| 
 | 
   383 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
 | 
| 
 | 
   384     """
 | 
| 
 | 
   385     Create a RAS (Reaction Activity Score) file from resolved rules.
 | 
| 
 | 
   386 
 | 
| 
 | 
   387     Args:
 | 
| 
 | 
   388         resolve_rules (dict): Dictionary containing resolved rules.
 | 
| 
 | 
   389         dataset_name (str): Name of the dataset.
 | 
| 
 | 
   390         rules (list): List of rules.
 | 
| 
 | 
   391         file (str): Path to the output RAS file.
 | 
| 
 | 
   392 
 | 
| 
 | 
   393     Returns:
 | 
| 
 | 
   394         None
 | 
| 
 | 
   395     """
 | 
| 
 | 
   396     if resolve_rules is None:
 | 
| 
 | 
   397         utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
 | 
| 
 | 
   398 
 | 
| 
 | 
   399     for geni in resolve_rules.values():
 | 
| 
 | 
   400         for i, valori in enumerate(geni):
 | 
| 
 | 
   401             if valori == None:
 | 
| 
 | 
   402                 geni[i] = 'None'
 | 
| 
 | 
   403                 
 | 
| 
 | 
   404     output_ras = pd.DataFrame.from_dict(resolve_rules)
 | 
| 
 | 
   405     
 | 
| 
 | 
   406     output_ras.insert(0, 'Reactions', ids)
 | 
| 
 | 
   407     output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
 | 
| 
 | 
   408     
 | 
| 
 | 
   409     text_file = open(file, "w")
 | 
| 
 | 
   410     
 | 
| 
 | 
   411     text_file.write(output_to_csv)
 | 
| 
 | 
   412     text_file.close()
 | 
| 
 | 
   413 
 | 
| 
 | 
   414 ################################- NEW RAS COMPUTATION -################################
 | 
| 
 | 
   415 Expr = Optional[Union[int, float]]
 | 
| 
 | 
   416 Ras  = Expr
 | 
| 
 | 
   417 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
 | 
| 
 | 
   418     """
 | 
| 
 | 
   419     Generates the RAS scores for each cell line found in the dataset.
 | 
| 
 | 
   420 
 | 
| 
 | 
   421     Args:
 | 
| 
 | 
   422         dataset (pd.DataFrame): Dataset containing gene values.
 | 
| 
 | 
   423         rules (dict): The dict containing reaction ids as keys and rules as values.
 | 
| 
489
 | 
   424     
 | 
| 
 | 
   425     Note:
 | 
| 
 | 
   426         Modifies dataset in place by setting the first column as index.
 | 
| 
93
 | 
   427     
 | 
| 
 | 
   428     Returns:
 | 
| 
 | 
   429         dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
 | 
| 
 | 
   430         where each key corresponds to a reaction ID and each value is its computed RAS score.
 | 
| 
 | 
   431     """
 | 
| 
 | 
   432     ras_values_by_cell_line = {}
 | 
| 
 | 
   433     dataset.set_index(dataset.columns[0], inplace=True)
 | 
| 
489
 | 
   434     
 | 
| 
 | 
   435     for cell_line_name in dataset.columns: #[1:]:
 | 
| 
93
 | 
   436         cell_line = dataset[cell_line_name].to_dict()
 | 
| 
 | 
   437         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
 | 
| 
 | 
   438     return ras_values_by_cell_line
 | 
| 
 | 
   439 
 | 
| 
 | 
   440 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
 | 
| 
 | 
   441     """
 | 
| 
 | 
   442     Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
 | 
| 
 | 
   443 
 | 
| 
 | 
   444     Args:
 | 
| 
 | 
   445         value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
 | 
| 
 | 
   446         dataset : gene expression data of one cell line.
 | 
| 
 | 
   447 
 | 
| 
 | 
   448     Returns:
 | 
| 
 | 
   449         dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
 | 
| 
 | 
   450     """
 | 
| 
 | 
   451     return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
 | 
| 
 | 
   452 
 | 
| 
 | 
   453 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
 | 
| 
 | 
   454     """
 | 
| 
 | 
   455     Extracts the gene expression of the given gene from a cell line dataset.
 | 
| 
 | 
   456 
 | 
| 
 | 
   457     Args:
 | 
| 
 | 
   458         dataset : gene expression data of one cell line.
 | 
| 
 | 
   459         name : gene name.
 | 
| 
 | 
   460     
 | 
| 
 | 
   461     Returns:
 | 
| 
 | 
   462         Expr : the gene's expression value.
 | 
| 
 | 
   463     """
 | 
| 
 | 
   464     expr = dataset.get(name, None)
 | 
| 
 | 
   465     if expr is None: ERRORS.append(name)
 | 
| 
 | 
   466   
 | 
| 
 | 
   467     return expr
 | 
| 
 | 
   468 
 | 
| 
 | 
   469 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
 | 
| 
 | 
   470     """
 | 
| 
 | 
   471     Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
 | 
| 
 | 
   472 
 | 
| 
 | 
   473     Args:
 | 
| 
 | 
   474         op_list (OpList): The OpList representing a rule with gene values.
 | 
| 
 | 
   475         dataset : gene expression data of one cell line.
 | 
| 
 | 
   476 
 | 
| 
 | 
   477     Returns:
 | 
| 
 | 
   478         Ras: The computed RAS value for the given OpList.
 | 
| 
 | 
   479     """
 | 
| 
 | 
   480     op = op_list.op
 | 
| 
 | 
   481     ras_value :Ras = None
 | 
| 
 | 
   482     if not op: return get_gene_expr(dataset, op_list[0])
 | 
| 
 | 
   483     if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
 | 
| 
 | 
   484 
 | 
| 
 | 
   485     for i in range(len(op_list)):
 | 
| 
 | 
   486         item = op_list[i]
 | 
| 
 | 
   487         if isinstance(item, ruleUtils.OpList):
 | 
| 
 | 
   488             item = ras_op_list(item, dataset)
 | 
| 
 | 
   489 
 | 
| 
 | 
   490         else:
 | 
| 
 | 
   491           item = get_gene_expr(dataset, item)
 | 
| 
 | 
   492 
 | 
| 
 | 
   493         if item is None:
 | 
| 
 | 
   494           if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
 | 
| 
 | 
   495           continue
 | 
| 
 | 
   496 
 | 
| 
 | 
   497         if ras_value is None:
 | 
| 
 | 
   498           ras_value = item
 | 
| 
 | 
   499         else:
 | 
| 
 | 
   500           ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
 | 
| 
 | 
   501 
 | 
| 
 | 
   502     return ras_value
 | 
| 
 | 
   503 
 | 
| 
 | 
   504 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
 | 
| 
 | 
   505     """
 | 
| 
489
 | 
   506     Save computed RAS scores to ARGS.ras_output as a TSV file.
 | 
| 
93
 | 
   507 
 | 
| 
 | 
   508     Args:
 | 
| 
 | 
   509         rasScores : the computed ras scores.
 | 
| 
489
 | 
   510         reactions : the list of reaction IDs, used as the first column.
 | 
| 
93
 | 
   511     
 | 
| 
 | 
   512     Returns:
 | 
| 
 | 
   513         None
 | 
| 
 | 
   514     """
 | 
| 
 | 
   515     for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
 | 
| 
 | 
   516         for reactId, score in scores.items():
 | 
| 
 | 
   517             if score is None: scores[reactId] = "None"
 | 
| 
 | 
   518 
 | 
| 
 | 
   519     output_ras = pd.DataFrame.from_dict(rasScores)
 | 
| 
 | 
   520     output_ras.insert(0, 'Reactions', reactions)
 | 
| 
 | 
   521     output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
 | 
| 
 | 
   522 
 | 
| 
 | 
   523 ############################ MAIN #############################################
 | 
| 
 | 
   524 #TODO: not used but keep, it will be when the new translator dicts will be used.
 | 
| 
 | 
   525 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
 | 
| 
 | 
   526     """
 | 
| 
 | 
   527     Translate gene from any supported encoding to HugoID.
 | 
| 
 | 
   528 
 | 
| 
 | 
   529     Args:
 | 
| 
 | 
   530         geneName (str): the name of the gene in its current encoding.
 | 
| 
 | 
   531         encoding (str): the encoding.
 | 
| 
 | 
   532         geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
 | 
| 
 | 
   533         and encodings in the current model, mapping each to the corresponding HugoID encoding.
 | 
| 
 | 
   534 
 | 
| 
 | 
   535     Raises:
 | 
| 
 | 
   536         ValueError: When the gene isn't supported in the model.
 | 
| 
 | 
   537 
 | 
| 
 | 
   538     Returns:
 | 
| 
 | 
   539         str: the gene in HugoID encoding.
 | 
| 
 | 
   540     """
 | 
| 
 | 
   541     supportedGenesInEncoding = geneTranslator[encoding]
 | 
| 
 | 
   542     if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
 | 
| 
489
 | 
   543     raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.")
 | 
| 
93
 | 
   544 
 | 
| 
 | 
   545 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
 | 
| 
 | 
   546     """
 | 
| 
 | 
   547     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
 | 
| 
 | 
   548     performed, significantly impacting the runtime.
 | 
| 
 | 
   549 
 | 
| 
 | 
   550     Returns:
 | 
| 
 | 
   551         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
 | 
| 
 | 
   552     """
 | 
| 
489
 | 
   553     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat
 | 
| 
 | 
   554 
 | 
| 
 | 
   555     dict_rule = {}
 | 
| 
 | 
   556 
 | 
| 
 | 
   557     try:
 | 
| 
 | 
   558         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
 | 
| 
 | 
   559         if len(rows) <= 1:
 | 
| 
 | 
   560             raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
381
 | 
   561 
 | 
| 
489
 | 
   562         if not rows:
 | 
| 
 | 
   563             raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   564         
 | 
| 
 | 
   565         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   566         
 | 
| 
 | 
   567     # First, try using a tab delimiter
 | 
| 
 | 
   568         for line in rows[1:]:
 | 
| 
 | 
   569             if len(line) <= idx_gpr:
 | 
| 
 | 
   570                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   571                 continue
 | 
| 
 | 
   572             
 | 
| 
 | 
   573             if line[idx_gpr] == "":
 | 
| 
 | 
   574                 dict_rule[line[id_idx]] = ruleUtils.OpList([""])
 | 
| 
 | 
   575             else:
 | 
| 
 | 
   576                 dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
 | 
| 
 | 
   577                 
 | 
| 
 | 
   578     except Exception as e:
 | 
| 
 | 
   579         # If parsing with tabs fails, try comma delimiter
 | 
| 
 | 
   580         try:
 | 
| 
 | 
   581             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
 | 
| 
 | 
   582             
 | 
| 
 | 
   583             if len(rows) <= 1:
 | 
| 
 | 
   584                 raise ValueError("Model tabular with 1 column is not supported.")
 | 
| 
 | 
   585 
 | 
| 
 | 
   586             if not rows:
 | 
| 
 | 
   587                 raise ValueError("Model tabular is file is empty.")
 | 
| 
 | 
   588             
 | 
| 
 | 
   589             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
 | 
| 
 | 
   590             
 | 
| 
 | 
   591             # Try again parsing row content with the GPR column using comma-separated values
 | 
| 
 | 
   592             for line in rows[1:]:
 | 
| 
 | 
   593                 if len(line) <= idx_gpr:
 | 
| 
 | 
   594                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
 | 
| 
 | 
   595                     continue
 | 
| 
 | 
   596                 
 | 
| 
 | 
   597                 if line[idx_gpr] == "":
 | 
| 
 | 
   598                     dict_rule[line[id_idx]] = ruleUtils.OpList([""])
 | 
| 
 | 
   599                 else:
 | 
| 
 | 
   600                     dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
 | 
| 
 | 
   601                     
 | 
| 
 | 
   602         except Exception as e2:
 | 
| 
 | 
   603             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
 | 
| 
 | 
   604 
 | 
| 
 | 
   605     if not dict_rule:
 | 
| 
 | 
   606             raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
 | 
| 
93
 | 
   607     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
 | 
| 
489
 | 
   608     return dict_rule
 | 
| 
 | 
   609 
 | 
| 
401
 | 
   610 
 | 
| 
147
 | 
   611 def main(args:List[str] = None) -> None:
 | 
| 
93
 | 
   612     """
 | 
| 
 | 
   613     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   614     
 | 
| 
 | 
   615     Returns:
 | 
| 
 | 
   616         None
 | 
| 
 | 
   617     """
 | 
| 
 | 
   618     # get args from frontend (related xml)
 | 
| 
 | 
   619     global ARGS
 | 
| 
147
 | 
   620     ARGS = process_args(args)
 | 
| 
309
 | 
   621 
 | 
| 
93
 | 
   622     # read dataset
 | 
| 
 | 
   623     dataset = read_dataset(ARGS.input, "dataset")
 | 
| 
 | 
   624     dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
 | 
| 
 | 
   625 
 | 
| 
 | 
   626     # remove versioning from gene names
 | 
| 
 | 
   627     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
 | 
| 
 | 
   628 
 | 
| 
489
 | 
   629     rules = load_custom_rules()
 | 
| 
 | 
   630     reactions = list(rules.keys())
 | 
| 
93
 | 
   631 
 | 
| 
489
 | 
   632     save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
 | 
| 
 | 
   633     if ERRORS: utils.logWarning(
 | 
| 
 | 
   634         f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
 | 
| 
 | 
   635         ARGS.out_log)  
 | 
| 
381
 | 
   636 
 | 
| 
489
 | 
   637 
 | 
| 
 | 
   638     print("Execution succeeded")
 | 
| 
93
 | 
   639 
 | 
| 
 | 
   640 ###############################################################################
 | 
| 
 | 
   641 if __name__ == "__main__":
 | 
| 
309
 | 
   642     main()
 |