# HG changeset patch # User luca_milaz # Date 1722888258 0 # Node ID 1434e851f92929e80fd5dea32e91f5f18d6ef8c2 # Parent 92a6764f05f267c19839e632b47d89cc15a32b31 Uploaded diff -r 92a6764f05f2 -r 1434e851f929 marea_2/ras_generator.py --- a/marea_2/ras_generator.py Mon Aug 05 20:04:01 2024 +0000 +++ b/marea_2/ras_generator.py Mon Aug 05 20:04:18 2024 +0000 @@ -1,713 +1,699 @@ -from __future__ import division -# galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason. -import sys -import argparse -import collections -import pandas as pd -import pickle as pk -import utils.general_utils as utils -import utils.rule_parsing as ruleUtils -from typing import Union, Optional, List, Dict, Tuple, TypeVar - -ERRORS = [] -########################## argparse ########################################## -ARGS :argparse.Namespace -def process_args() -> argparse.Namespace: - """ - Processes command-line arguments. - - Args: - args (list): List of command-line arguments. - - Returns: - Namespace: An object containing parsed arguments. - """ - parser = argparse.ArgumentParser( - usage = '%(prog)s [options]', - description = "process some value's genes to create a comparison's map.") - - parser.add_argument( - '-rs', '--rules_selector', - type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model), - help = 'chose which type of dataset you want use') - - parser.add_argument("-rl", "--rule_list", type = str, - help = "path to input file with custom rules, if provided") - - parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name") - # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in - - parser.add_argument( - '-n', '--none', - type = utils.Bool("none"), default = True, - help = 'compute Nan values') - - parser.add_argument( - '-td', '--tool_dir', - type = str, - required = True, help = 'your tool directory') - - parser.add_argument( - '-ol', '--out_log', - type = str, - help = "Output log") - - parser.add_argument( - '-in', '--input', #id è diventato in - type = str, - help = 'input dataset') - - parser.add_argument( - '-ra', '--ras_output', - type = str, - required = True, help = 'ras output') - - return parser.parse_args() - -############################ dataset input #################################### -def read_dataset(data :str, name :str) -> pd.DataFrame: - """ - Read a dataset from a CSV file and return it as a pandas DataFrame. - - Args: - data (str): Path to the CSV file containing the dataset. - name (str): Name of the dataset, used in error messages. - - Returns: - pandas.DataFrame: DataFrame containing the dataset. - - Raises: - pd.errors.EmptyDataError: If the CSV file is empty. - sys.exit: If the CSV file has the wrong format, the execution is aborted. - """ - try: - dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python') - except pd.errors.EmptyDataError: - sys.exit('Execution aborted: wrong format of ' + name + '\n') - if len(dataset.columns) < 2: - sys.exit('Execution aborted: wrong format of ' + name + '\n') - return dataset - -############################ load id e rules ################################## -def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]: - """ - Load IDs and rules from a dictionary of reactions. - - Args: - reactions (dict): A dictionary where keys are IDs and values are rules. - - Returns: - tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules. - """ - ids, rules = [], [] - for key, value in reactions.items(): - ids.append(key) - rules.append(value) - return (ids, rules) - -############################ check_methods #################################### -def gene_type(l :str, name :str) -> str: - """ - Determine the type of gene ID. - - Args: - l (str): The gene identifier to check. - name (str): The name of the dataset, used in error messages. - - Returns: - str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id'). - - Raises: - sys.exit: If the gene ID type is not supported, the execution is aborted. - """ - if check_hgnc(l): - return 'hugo_id' - elif check_ensembl(l): - return 'ensembl_gene_id' - elif check_symbol(l): - return 'symbol' - elif check_entrez(l): - return 'entrez_id' - else: - sys.exit('Execution aborted:\n' + - 'gene ID type in ' + name + ' not supported. Supported ID'+ - 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') - -def check_hgnc(l :str) -> bool: - """ - Check if a gene identifier follows the HGNC format. - - Args: - l (str): The gene identifier to check. - - Returns: - bool: True if the gene identifier follows the HGNC format, False otherwise. - """ - if len(l) > 5: - if (l.upper()).startswith('HGNC:'): - return l[5:].isdigit() - else: - return False - else: - return False - -def check_ensembl(l :str) -> bool: - """ - Check if a gene identifier follows the Ensembl format. - - Args: - l (str): The gene identifier to check. - - Returns: - bool: True if the gene identifier follows the Ensembl format, False otherwise. - """ - return l.upper().startswith('ENS') - - -def check_symbol(l :str) -> bool: - """ - Check if a gene identifier follows the symbol format. - - Args: - l (str): The gene identifier to check. - - Returns: - bool: True if the gene identifier follows the symbol format, False otherwise. - """ - if len(l) > 0: - if l[0].isalpha() and l[1:].isalnum(): - return True - else: - return False - else: - return False - -def check_entrez(l :str) -> bool: - """ - Check if a gene identifier follows the Entrez ID format. - - Args: - l (str): The gene identifier to check. - - Returns: - bool: True if the gene identifier follows the Entrez ID format, False otherwise. - """ - if len(l) > 0: - return l.isdigit() - else: - return False - -############################ gene ############################################# -def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]: - """ - Process gene data to ensure correct formatting and handle duplicates. - - Args: - gene (DataFrame): DataFrame containing gene data. - type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id'). - name (str): Name of the dataset. - gene_custom (dict or None): Custom gene data dictionary if provided. - - Returns: - dict: A dictionary containing gene data with gene IDs as keys and corresponding values. - """ - args = process_args() - for i in range(len(gene)): - tmp = gene.iloc[i, 0] - gene.iloc[i, 0] = tmp.strip().split('.')[0] - - gene_dup = [item for item, count in - collections.Counter(gene[gene.columns[0]]).items() if count > 1] - pat_dup = [item for item, count in - collections.Counter(list(gene.columns)).items() if count > 1] - - if gene_dup: - if gene_custom == None: - if args.rules_selector == 'HMRcore': - gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb')) - - elif args.rules_selector == 'Recon': - gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb')) - - elif args.rules_selector == 'ENGRO2': - gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb')) - - gene_in_rule = gene_in_rule.get(type_gene) - - else: - gene_in_rule = gene_custom - tmp = [] - for i in gene_dup: - if gene_in_rule.get(i) == 'ok': - tmp.append(i) - if tmp: - sys.exit('Execution aborted because gene ID ' - +str(tmp)+' in '+name+' is duplicated\n') - - if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log) - return (gene.set_index(gene.columns[0])).to_dict() - -############################ resolve ########################################## -def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: - """ - Replace gene identifiers with corresponding values from a dictionary. - - Args: - l (str): String of gene identifier. - d (str): String corresponding to its value. - - Returns: - tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement. - """ - tmp = [] - err = [] - while l: - if isinstance(l[0], list): - tmp_rules, tmp_err = replace_gene_value(l[0], d) - tmp.append(tmp_rules) - err.extend(tmp_err) - else: - value = replace_gene(l[0], d) - tmp.append(value) - if value == None: - err.append(l[0]) - l = l[1:] - return (tmp, err) - -def replace_gene(l :str, d :str) -> Union[int, float]: - """ - Replace a single gene identifier with its corresponding value from a dictionary. - - Args: - l (str): Gene identifier to replace. - d (str): String corresponding to its value. - - Returns: - float/int: Corresponding value from the dictionary if found, None otherwise. - - Raises: - sys.exit: If the value associated with the gene identifier is not valid. - """ - if l =='and' or l == 'or': - return l - else: - value = d.get(l, None) - if not(value == None or isinstance(value, (int, float))): - sys.exit('Execution aborted: ' + value + ' value not valid\n') - return value - -T = TypeVar("T", bound = Optional[Union[int, float]]) -def computes(val1 :T, op :str, val2 :T, cn :bool) -> T: - """ - Compute the RAS value between two value and an operator ('and' or 'or'). - - Args: - val1(Optional(Union[float, int])): First value. - op (str): Operator ('and' or 'or'). - val2(Optional(Union[float, int])): Second value. - cn (bool): Control boolean value. - - Returns: - Optional(Union[float, int]): Result of the computation. - """ - if val1 != None and val2 != None: - if op == 'and': - return min(val1, val2) - else: - return val1 + val2 - elif op == 'and': - if cn is True: - if val1 != None: - return val1 - elif val2 != None: - return val2 - else: - return None - else: - return None - else: - if val1 != None: - return val1 - elif val2 != None: - return val2 - else: - return None - -# ris should be Literal[None] but Literal is not supported in Python 3.7 -def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]: - """ - Control the format of the expression. - - Args: - ris: Intermediate result. - l (list): Expression to control. - cn (bool): Control boolean value. - - Returns: - Union[Literal[False], int, float]: Result of the control. - """ - if len(l) == 1: - if isinstance(l[0], (float, int)) or l[0] == None: - return l[0] - elif isinstance(l[0], list): - return control(None, l[0], cn) - else: - return False - elif len(l) > 2: - return control_list(ris, l, cn) - else: - return False - -def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]: - """ - Control the format of a list of expressions. - - Args: - ris: Intermediate result. - l (list): List of expressions to control. - cn (bool): Control boolean value. - - Returns: - Optional[Literal[False]]: Result of the control. - """ - while l: - if len(l) == 1: - return False - elif (isinstance(l[0], (float, int)) or - l[0] == None) and l[1] in ['and', 'or']: - if isinstance(l[2], (float, int)) or l[2] == None: - ris = computes(l[0], l[1], l[2], cn) - elif isinstance(l[2], list): - tmp = control(None, l[2], cn) - if tmp is False: - return False - else: - ris = computes(l[0], l[1], tmp, cn) - else: - return False - l = l[3:] - elif l[0] in ['and', 'or']: - if isinstance(l[1], (float, int)) or l[1] == None: - ris = computes(ris, l[0], l[1], cn) - elif isinstance(l[1], list): - tmp = control(None,l[1], cn) - if tmp is False: - return False - else: - ris = computes(ris, l[0], tmp, cn) - else: - return False - l = l[2:] - elif isinstance(l[0], list) and l[1] in ['and', 'or']: - if isinstance(l[2], (float, int)) or l[2] == None: - tmp = control(None, l[0], cn) - if tmp is False: - return False - else: - ris = computes(tmp, l[1], l[2], cn) - elif isinstance(l[2], list): - tmp = control(None, l[0], cn) - tmp2 = control(None, l[2], cn) - if tmp is False or tmp2 is False: - return False - else: - ris = computes(tmp, l[1], tmp2, cn) - else: - return False - l = l[3:] - else: - return False - return ris - -ResolvedRules = Dict[str, List[Optional[Union[float, int]]]] -def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]: - """ - Resolve rules using gene data to compute scores for each rule. - - Args: - genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values. - rules (list): List of rules to resolve. - ids (list): List of IDs corresponding to the rules. - resolve_none (bool): Flag indicating whether to resolve None values in the rules. - name (str): Name of the dataset. - - Returns: - tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data. - """ - resolve_rules = {} - not_found = [] - flag = False - for key, value in genes.items(): - tmp_resolve = [] - for i in range(len(rules)): - tmp = rules[i] - if tmp: - tmp, err = replace_gene_value(tmp, value) - if err: - not_found.extend(err) - ris = control(None, tmp, resolve_none) - if ris is False or ris == None: - tmp_resolve.append(None) - else: - tmp_resolve.append(ris) - flag = True - else: - tmp_resolve.append(None) - resolve_rules[key] = tmp_resolve - - if flag is False: - utils.logWarning( - f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded", - ARGS.out_log) - - return (None, None) - - return (resolve_rules, list(set(not_found))) -############################ create_ras ####################################### -def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None: - """ - Create a RAS (Reaction Activity Score) file from resolved rules. - - Args: - resolve_rules (dict): Dictionary containing resolved rules. - dataset_name (str): Name of the dataset. - rules (list): List of rules. - file (str): Path to the output RAS file. - - Returns: - None - """ - if resolve_rules is None: - utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log) - - for geni in resolve_rules.values(): - for i, valori in enumerate(geni): - if valori == None: - geni[i] = 'None' - - output_ras = pd.DataFrame.from_dict(resolve_rules) - - output_ras.insert(0, 'Reactions', ids) - output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False) - - text_file = open(file, "w") - - text_file.write(output_to_csv) - text_file.close() - -################################- NEW RAS COMPUTATION -################################ -Expr = Optional[Union[int, float]] -Ras = Expr -def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]: - """ - Generates the RAS scores for each cell line found in the dataset. - - Args: - dataset (pd.DataFrame): Dataset containing gene values. - rules (dict): The dict containing reaction ids as keys and rules as values. - - Side effects: - dataset : mut - - Returns: - dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary - where each key corresponds to a reaction ID and each value is its computed RAS score. - """ - ras_values_by_cell_line = {} - dataset.set_index(dataset.columns[0], inplace=True) - # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata - for cell_line_name in dataset.columns[1:]: - cell_line = dataset[cell_line_name].to_dict() - ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) - return ras_values_by_cell_line - -def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: - """ - Computes the RAS (Reaction Activity Score) values for each rule in the given dict. - - Args: - value_rules (dict): A dictionary where keys are reaction ids and values are OpLists. - dataset : gene expression data of one cell line. - - Returns: - dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule. - """ - return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()} - -def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr: - """ - Extracts the gene expression of the given gene from a cell line dataset. - - Args: - dataset : gene expression data of one cell line. - name : gene name. - - Returns: - Expr : the gene's expression value. - """ - expr = dataset.get(name, None) - if expr is None: ERRORS.append(name) - - return expr - -def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras: - """ - Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior. - - Args: - op_list (OpList): The OpList representing a rule with gene values. - dataset : gene expression data of one cell line. - - Returns: - Ras: The computed RAS value for the given OpList. - """ - op = op_list.op - ras_value :Ras = None - if not op: return get_gene_expr(dataset, op_list[0]) - if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None - - for i in range(len(op_list)): - item = op_list[i] - if isinstance(item, ruleUtils.OpList): - item = ras_op_list(item, dataset) - - else: - item = get_gene_expr(dataset, item) - - if item is None: - if op is ruleUtils.RuleOp.AND and not ARGS.none: return None - continue - - if ras_value is None: - ras_value = item - else: - ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item) - - return ras_value - -def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: - """ - Save computed ras scores to the given path, as a tsv file. - - Args: - rasScores : the computed ras scores. - path : the output tsv file's path. - - Returns: - None - """ - for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly - for reactId, score in scores.items(): - if score is None: scores[reactId] = "None" - - output_ras = pd.DataFrame.from_dict(rasScores) - output_ras.insert(0, 'Reactions', reactions) - output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False) - -############################ MAIN ############################################# -#TODO: not used but keep, it will be when the new translator dicts will be used. -def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str: - """ - Translate gene from any supported encoding to HugoID. - - Args: - geneName (str): the name of the gene in its current encoding. - encoding (str): the encoding. - geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names - and encodings in the current model, mapping each to the corresponding HugoID encoding. - - Raises: - ValueError: When the gene isn't supported in the model. - - Returns: - str: the gene in HugoID encoding. - """ - supportedGenesInEncoding = geneTranslator[encoding] - if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] - raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!") - -def load_custom_rules2() -> Dict[str, ruleUtils.OpList]: - """ - Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be - performed, significantly impacting the runtime. - - Returns: - Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. - """ - datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat - - try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext - except utils.PathErr as err: - raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}") - - - if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) - - # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. - return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) } - -def load_custom_rules() -> Dict[str, ruleUtils.OpList]: - """ - Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be - performed, significantly impacting the runtime. - - Returns: - Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. - """ - dataset = pd.read_csv(ARGS.rule_list, sep = '\t', header = 0, engine='python') - - # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. - return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in dataset } - -def main() -> None: - """ - Initializes everything and sets the program in motion based on the fronted input arguments. - - Returns: - None - """ - # get args from frontend (related xml) - global ARGS - ARGS = process_args() - - # read dataset - dataset = read_dataset(ARGS.input, "dataset") - dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) - - # remove versioning from gene names - dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] - - # handle custom models - model :utils.Model = ARGS.rules_selector - if model is utils.Model.Custom: - rules = load_custom_rules() - reactions = list(rules.keys()) - - save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) - if ERRORS: utils.logWarning( - f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", - ARGS.out_log) - - return - - # This is the standard flow of the ras_generator program, for non-custom models. - name = "RAS Dataset" - type_gene = gene_type(dataset.iloc[0, 0], name) - - rules = model.getRules(ARGS.tool_dir) - genes = data_gene(dataset, type_gene, name, None) - ids, rules = load_id_rules(rules.get(type_gene)) - - resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) - create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) - - if err: utils.logWarning( - f"Warning: gene(s) {err} not found in class \"{name}\", " + - "the expression level for this gene will be considered NaN", - ARGS.out_log) - - print("Execution succeded") - -############################################################################### -if __name__ == "__main__": +from __future__ import division +# galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason. +import sys +import argparse +import collections +import pandas as pd +import pickle as pk +import utils.general_utils as utils +import utils.rule_parsing as ruleUtils +from typing import Union, Optional, List, Dict, Tuple, TypeVar + +ERRORS = [] +########################## argparse ########################################## +ARGS :argparse.Namespace +def process_args() -> argparse.Namespace: + """ + Processes command-line arguments. + + Args: + args (list): List of command-line arguments. + + Returns: + Namespace: An object containing parsed arguments. + """ + parser = argparse.ArgumentParser( + usage = '%(prog)s [options]', + description = "process some value's genes to create a comparison's map.") + + parser.add_argument( + '-rs', '--rules_selector', + type = utils.Model, default = utils.Model.HMRcore, choices = list(utils.Model), + help = 'chose which type of dataset you want use') + + parser.add_argument("-rl", "--rule_list", type = str, + help = "path to input file with custom rules, if provided") + + parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name") + # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in + + parser.add_argument( + '-n', '--none', + type = utils.Bool("none"), default = True, + help = 'compute Nan values') + + parser.add_argument( + '-td', '--tool_dir', + type = str, + required = True, help = 'your tool directory') + + parser.add_argument( + '-ol', '--out_log', + type = str, + help = "Output log") + + parser.add_argument( + '-in', '--input', #id è diventato in + type = str, + help = 'input dataset') + + parser.add_argument( + '-ra', '--ras_output', + type = str, + required = True, help = 'ras output') + + return parser.parse_args() + +############################ dataset input #################################### +def read_dataset(data :str, name :str) -> pd.DataFrame: + """ + Read a dataset from a CSV file and return it as a pandas DataFrame. + + Args: + data (str): Path to the CSV file containing the dataset. + name (str): Name of the dataset, used in error messages. + + Returns: + pandas.DataFrame: DataFrame containing the dataset. + + Raises: + pd.errors.EmptyDataError: If the CSV file is empty. + sys.exit: If the CSV file has the wrong format, the execution is aborted. + """ + try: + dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python') + except pd.errors.EmptyDataError: + sys.exit('Execution aborted: wrong format of ' + name + '\n') + if len(dataset.columns) < 2: + sys.exit('Execution aborted: wrong format of ' + name + '\n') + return dataset + +############################ load id e rules ################################## +def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]: + """ + Load IDs and rules from a dictionary of reactions. + + Args: + reactions (dict): A dictionary where keys are IDs and values are rules. + + Returns: + tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules. + """ + ids, rules = [], [] + for key, value in reactions.items(): + ids.append(key) + rules.append(value) + return (ids, rules) + +############################ check_methods #################################### +def gene_type(l :str, name :str) -> str: + """ + Determine the type of gene ID. + + Args: + l (str): The gene identifier to check. + name (str): The name of the dataset, used in error messages. + + Returns: + str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id'). + + Raises: + sys.exit: If the gene ID type is not supported, the execution is aborted. + """ + if check_hgnc(l): + return 'hugo_id' + elif check_ensembl(l): + return 'ensembl_gene_id' + elif check_symbol(l): + return 'symbol' + elif check_entrez(l): + return 'entrez_id' + else: + sys.exit('Execution aborted:\n' + + 'gene ID type in ' + name + ' not supported. Supported ID'+ + 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') + +def check_hgnc(l :str) -> bool: + """ + Check if a gene identifier follows the HGNC format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the HGNC format, False otherwise. + """ + if len(l) > 5: + if (l.upper()).startswith('HGNC:'): + return l[5:].isdigit() + else: + return False + else: + return False + +def check_ensembl(l :str) -> bool: + """ + Check if a gene identifier follows the Ensembl format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the Ensembl format, False otherwise. + """ + return l.upper().startswith('ENS') + + +def check_symbol(l :str) -> bool: + """ + Check if a gene identifier follows the symbol format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the symbol format, False otherwise. + """ + if len(l) > 0: + if l[0].isalpha() and l[1:].isalnum(): + return True + else: + return False + else: + return False + +def check_entrez(l :str) -> bool: + """ + Check if a gene identifier follows the Entrez ID format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the Entrez ID format, False otherwise. + """ + if len(l) > 0: + return l.isdigit() + else: + return False + +############################ gene ############################################# +def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]: + """ + Process gene data to ensure correct formatting and handle duplicates. + + Args: + gene (DataFrame): DataFrame containing gene data. + type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id'). + name (str): Name of the dataset. + gene_custom (dict or None): Custom gene data dictionary if provided. + + Returns: + dict: A dictionary containing gene data with gene IDs as keys and corresponding values. + """ + args = process_args() + for i in range(len(gene)): + tmp = gene.iloc[i, 0] + gene.iloc[i, 0] = tmp.strip().split('.')[0] + + gene_dup = [item for item, count in + collections.Counter(gene[gene.columns[0]]).items() if count > 1] + pat_dup = [item for item, count in + collections.Counter(list(gene.columns)).items() if count > 1] + + if gene_dup: + if gene_custom == None: + if args.rules_selector == 'HMRcore': + gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb')) + + elif args.rules_selector == 'Recon': + gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/Recon_genes.p', 'rb')) + + elif args.rules_selector == 'ENGRO2': + gene_in_rule = pk.load(open(args.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb')) + + gene_in_rule = gene_in_rule.get(type_gene) + + else: + gene_in_rule = gene_custom + tmp = [] + for i in gene_dup: + if gene_in_rule.get(i) == 'ok': + tmp.append(i) + if tmp: + sys.exit('Execution aborted because gene ID ' + +str(tmp)+' in '+name+' is duplicated\n') + + if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log) + return (gene.set_index(gene.columns[0])).to_dict() + +############################ resolve ########################################## +def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: + """ + Replace gene identifiers with corresponding values from a dictionary. + + Args: + l (str): String of gene identifier. + d (str): String corresponding to its value. + + Returns: + tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement. + """ + tmp = [] + err = [] + while l: + if isinstance(l[0], list): + tmp_rules, tmp_err = replace_gene_value(l[0], d) + tmp.append(tmp_rules) + err.extend(tmp_err) + else: + value = replace_gene(l[0], d) + tmp.append(value) + if value == None: + err.append(l[0]) + l = l[1:] + return (tmp, err) + +def replace_gene(l :str, d :str) -> Union[int, float]: + """ + Replace a single gene identifier with its corresponding value from a dictionary. + + Args: + l (str): Gene identifier to replace. + d (str): String corresponding to its value. + + Returns: + float/int: Corresponding value from the dictionary if found, None otherwise. + + Raises: + sys.exit: If the value associated with the gene identifier is not valid. + """ + if l =='and' or l == 'or': + return l + else: + value = d.get(l, None) + if not(value == None or isinstance(value, (int, float))): + sys.exit('Execution aborted: ' + value + ' value not valid\n') + return value + +T = TypeVar("T", bound = Optional[Union[int, float]]) +def computes(val1 :T, op :str, val2 :T, cn :bool) -> T: + """ + Compute the RAS value between two value and an operator ('and' or 'or'). + + Args: + val1(Optional(Union[float, int])): First value. + op (str): Operator ('and' or 'or'). + val2(Optional(Union[float, int])): Second value. + cn (bool): Control boolean value. + + Returns: + Optional(Union[float, int]): Result of the computation. + """ + if val1 != None and val2 != None: + if op == 'and': + return min(val1, val2) + else: + return val1 + val2 + elif op == 'and': + if cn is True: + if val1 != None: + return val1 + elif val2 != None: + return val2 + else: + return None + else: + return None + else: + if val1 != None: + return val1 + elif val2 != None: + return val2 + else: + return None + +# ris should be Literal[None] but Literal is not supported in Python 3.7 +def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]: + """ + Control the format of the expression. + + Args: + ris: Intermediate result. + l (list): Expression to control. + cn (bool): Control boolean value. + + Returns: + Union[Literal[False], int, float]: Result of the control. + """ + if len(l) == 1: + if isinstance(l[0], (float, int)) or l[0] == None: + return l[0] + elif isinstance(l[0], list): + return control(None, l[0], cn) + else: + return False + elif len(l) > 2: + return control_list(ris, l, cn) + else: + return False + +def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]: + """ + Control the format of a list of expressions. + + Args: + ris: Intermediate result. + l (list): List of expressions to control. + cn (bool): Control boolean value. + + Returns: + Optional[Literal[False]]: Result of the control. + """ + while l: + if len(l) == 1: + return False + elif (isinstance(l[0], (float, int)) or + l[0] == None) and l[1] in ['and', 'or']: + if isinstance(l[2], (float, int)) or l[2] == None: + ris = computes(l[0], l[1], l[2], cn) + elif isinstance(l[2], list): + tmp = control(None, l[2], cn) + if tmp is False: + return False + else: + ris = computes(l[0], l[1], tmp, cn) + else: + return False + l = l[3:] + elif l[0] in ['and', 'or']: + if isinstance(l[1], (float, int)) or l[1] == None: + ris = computes(ris, l[0], l[1], cn) + elif isinstance(l[1], list): + tmp = control(None,l[1], cn) + if tmp is False: + return False + else: + ris = computes(ris, l[0], tmp, cn) + else: + return False + l = l[2:] + elif isinstance(l[0], list) and l[1] in ['and', 'or']: + if isinstance(l[2], (float, int)) or l[2] == None: + tmp = control(None, l[0], cn) + if tmp is False: + return False + else: + ris = computes(tmp, l[1], l[2], cn) + elif isinstance(l[2], list): + tmp = control(None, l[0], cn) + tmp2 = control(None, l[2], cn) + if tmp is False or tmp2 is False: + return False + else: + ris = computes(tmp, l[1], tmp2, cn) + else: + return False + l = l[3:] + else: + return False + return ris + +ResolvedRules = Dict[str, List[Optional[Union[float, int]]]] +def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]: + """ + Resolve rules using gene data to compute scores for each rule. + + Args: + genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values. + rules (list): List of rules to resolve. + ids (list): List of IDs corresponding to the rules. + resolve_none (bool): Flag indicating whether to resolve None values in the rules. + name (str): Name of the dataset. + + Returns: + tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data. + """ + resolve_rules = {} + not_found = [] + flag = False + for key, value in genes.items(): + tmp_resolve = [] + for i in range(len(rules)): + tmp = rules[i] + if tmp: + tmp, err = replace_gene_value(tmp, value) + if err: + not_found.extend(err) + ris = control(None, tmp, resolve_none) + if ris is False or ris == None: + tmp_resolve.append(None) + else: + tmp_resolve.append(ris) + flag = True + else: + tmp_resolve.append(None) + resolve_rules[key] = tmp_resolve + + if flag is False: + utils.logWarning( + f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded", + ARGS.out_log) + + return (None, None) + + return (resolve_rules, list(set(not_found))) +############################ create_ras ####################################### +def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None: + """ + Create a RAS (Reaction Activity Score) file from resolved rules. + + Args: + resolve_rules (dict): Dictionary containing resolved rules. + dataset_name (str): Name of the dataset. + rules (list): List of rules. + file (str): Path to the output RAS file. + + Returns: + None + """ + if resolve_rules is None: + utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log) + + for geni in resolve_rules.values(): + for i, valori in enumerate(geni): + if valori == None: + geni[i] = 'None' + + output_ras = pd.DataFrame.from_dict(resolve_rules) + + output_ras.insert(0, 'Reactions', ids) + output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False) + + text_file = open(file, "w") + + text_file.write(output_to_csv) + text_file.close() + +################################- NEW RAS COMPUTATION -################################ +Expr = Optional[Union[int, float]] +Ras = Expr +def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]: + """ + Generates the RAS scores for each cell line found in the dataset. + + Args: + dataset (pd.DataFrame): Dataset containing gene values. + rules (dict): The dict containing reaction ids as keys and rules as values. + + Side effects: + dataset : mut + + Returns: + dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary + where each key corresponds to a reaction ID and each value is its computed RAS score. + """ + ras_values_by_cell_line = {} + dataset.set_index(dataset.columns[0], inplace=True) + # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata + for cell_line_name in dataset.columns[1:]: + cell_line = dataset[cell_line_name].to_dict() + ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) + return ras_values_by_cell_line + +def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: + """ + Computes the RAS (Reaction Activity Score) values for each rule in the given dict. + + Args: + value_rules (dict): A dictionary where keys are reaction ids and values are OpLists. + dataset : gene expression data of one cell line. + + Returns: + dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule. + """ + return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()} + +def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr: + """ + Extracts the gene expression of the given gene from a cell line dataset. + + Args: + dataset : gene expression data of one cell line. + name : gene name. + + Returns: + Expr : the gene's expression value. + """ + expr = dataset.get(name, None) + if expr is None: ERRORS.append(name) + + return expr + +def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras: + """ + Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior. + + Args: + op_list (OpList): The OpList representing a rule with gene values. + dataset : gene expression data of one cell line. + + Returns: + Ras: The computed RAS value for the given OpList. + """ + op = op_list.op + ras_value :Ras = None + if not op: return get_gene_expr(dataset, op_list[0]) + if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None + + for i in range(len(op_list)): + item = op_list[i] + if isinstance(item, ruleUtils.OpList): + item = ras_op_list(item, dataset) + + else: + item = get_gene_expr(dataset, item) + + if item is None: + if op is ruleUtils.RuleOp.AND and not ARGS.none: return None + continue + + if ras_value is None: + ras_value = item + else: + ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item) + + return ras_value + +def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: + """ + Save computed ras scores to the given path, as a tsv file. + + Args: + rasScores : the computed ras scores. + path : the output tsv file's path. + + Returns: + None + """ + for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly + for reactId, score in scores.items(): + if score is None: scores[reactId] = "None" + + output_ras = pd.DataFrame.from_dict(rasScores) + output_ras.insert(0, 'Reactions', reactions) + output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False) + +############################ MAIN ############################################# +#TODO: not used but keep, it will be when the new translator dicts will be used. +def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str: + """ + Translate gene from any supported encoding to HugoID. + + Args: + geneName (str): the name of the gene in its current encoding. + encoding (str): the encoding. + geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names + and encodings in the current model, mapping each to the corresponding HugoID encoding. + + Raises: + ValueError: When the gene isn't supported in the model. + + Returns: + str: the gene in HugoID encoding. + """ + supportedGenesInEncoding = geneTranslator[encoding] + if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] + raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!") + +def load_custom_rules() -> Dict[str, ruleUtils.OpList]: + """ + Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be + performed, significantly impacting the runtime. + + Returns: + Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. + """ + datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat + + try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext + except utils.PathErr as err: + raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}") + + if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) + + # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. + return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) } + +def main() -> None: + """ + Initializes everything and sets the program in motion based on the fronted input arguments. + + Returns: + None + """ + # get args from frontend (related xml) + global ARGS + ARGS = process_args() + + # read dataset + dataset = read_dataset(ARGS.input, "dataset") + dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) + + # remove versioning from gene names + dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] + + # handle custom models + model :utils.Model = ARGS.rules_selector + if model is utils.Model.Custom: + rules = load_custom_rules() + reactions = list(rules.keys()) + + save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) + if ERRORS: utils.logWarning( + f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", + ARGS.out_log) + + return + + # This is the standard flow of the ras_generator program, for non-custom models. + name = "RAS Dataset" + type_gene = gene_type(dataset.iloc[0, 0], name) + + rules = model.getRules(ARGS.tool_dir) + genes = data_gene(dataset, type_gene, name, None) + ids, rules = load_id_rules(rules.get(type_gene)) + + resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) + create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) + + if err: utils.logWarning( + f"Warning: gene(s) {err} not found in class \"{name}\", " + + "the expression level for this gene will be considered NaN", + ARGS.out_log) + + print("Execution succeded") + +############################################################################### +if __name__ == "__main__": main() \ No newline at end of file