Mercurial > repos > bimib > cobraxy
diff COBRAxy/ras_generator.py @ 406:187cee1a00e2 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 08 Sep 2025 14:44:15 +0000 |
parents | ccccb731c953 |
children |
line wrap: on
line diff
--- a/COBRAxy/ras_generator.py Mon Sep 08 13:52:58 2025 +0000 +++ b/COBRAxy/ras_generator.py Mon Sep 08 14:44:15 2025 +0000 @@ -27,10 +27,15 @@ usage = '%(prog)s [options]', description = "process some value's genes to create a comparison's map.") - parser.add_argument("-rl", "--model_upload", type = str, - help = "path to input file containing the rules") + parser.add_argument( + '-rs', '--rules_selector', + type = utils.Model, default = utils.Model.ENGRO2, choices = list(utils.Model), + help = 'chose which type of dataset you want use') + + parser.add_argument("-rl", "--rule_list", type = str, + help = "path to input file with custom rules, if provided") - parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") + parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name") # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in parser.add_argument( @@ -518,8 +523,8 @@ """ ras_values_by_cell_line = {} dataset.set_index(dataset.columns[0], inplace=True) - - for cell_line_name in dataset.columns: #[1:]: + # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata + for cell_line_name in dataset.columns[1:]: cell_line = dataset[cell_line_name].to_dict() ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) return ras_values_by_cell_line @@ -637,50 +642,16 @@ Returns: Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. """ - datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in galaxy as a .dat - - #try: filenamePath = utils.FilePath.fromStrPath(ARGS.model_upload_name) # file's name in input, to determine its original ext - #except utils.PathErr as err: - # utils.logWarning(f"Cannot determine file extension from filename '{ARGS.model_upload_name}'. Assuming tabular format.", ARGS.out_log) - # filenamePath = None + datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat + + try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext + except utils.PathErr as err: + raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}") - #if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) - - dict_rule = {} + if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) - try: - # Proviamo prima con delimitatore tab - for line in utils.readCsv(datFilePath, delimiter = "\t"): - if len(line) < 3: # Controlliamo che ci siano almeno 3 colonne - utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) - continue - - if line[2] == "": - dict_rule[line[0]] = ruleUtils.OpList([""]) - else: - dict_rule[line[0]] = ruleUtils.parseRuleToNestedList(line[2]) - - except Exception as e: - # Se fallisce con tab, proviamo con virgola - try: - dict_rule = {} - for line in utils.readCsv(datFilePath, delimiter = ","): - if len(line) < 3: - utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) - continue - - if line[2] == "": - dict_rule[line[0]] = ruleUtils.OpList([""]) - else: - dict_rule[line[0]] = ruleUtils.parseRuleToNestedList(line[2]) - except Exception as e2: - raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}") - - if not dict_rule: - raise ValueError("No valid rules found in the uploaded file. Please check the file format.") # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. - return dict_rule - + return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) } def main(args:List[str] = None) -> None: """ @@ -700,46 +671,35 @@ # remove versioning from gene names dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] - rules = load_custom_rules() - reactions = list(rules.keys()) + # handle custom models + model :utils.Model = ARGS.rules_selector - save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) - if ERRORS: utils.logWarning( - f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", - ARGS.out_log) - - - ############ + if model is utils.Model.Custom: + rules = load_custom_rules() + reactions = list(rules.keys()) - # handle custom models - #model :utils.Model = ARGS.rules_selector - - #if model is utils.Model.Custom: - # rules = load_custom_rules() - # reactions = list(rules.keys()) - - # save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) - # if ERRORS: utils.logWarning( - # f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", - # ARGS.out_log) + save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) + if ERRORS: utils.logWarning( + f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", + ARGS.out_log) - # return + return # This is the standard flow of the ras_generator program, for non-custom models. - #name = "RAS Dataset" - #type_gene = gene_type(dataset.iloc[0, 0], name) - - #rules = model.getRules(ARGS.tool_dir) - #genes = data_gene(dataset, type_gene, name, None) - #ids, rules = load_id_rules(rules.get(type_gene)) + name = "RAS Dataset" + type_gene = gene_type(dataset.iloc[0, 0], name) - #resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) - #create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) + rules = model.getRules(ARGS.tool_dir) + genes = data_gene(dataset, type_gene, name, None) + ids, rules = load_id_rules(rules.get(type_gene)) - #if err: utils.logWarning( - # f"Warning: gene(s) {err} not found in class \"{name}\", " + - # "the expression level for this gene will be considered NaN", - # ARGS.out_log) + resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) + create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) + + if err: utils.logWarning( + f"Warning: gene(s) {err} not found in class \"{name}\", " + + "the expression level for this gene will be considered NaN", + ARGS.out_log) print("Execution succeded")