diff COBRAxy/ras_generator.py @ 406:187cee1a00e2 draft

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 14:44:15 +0000
parents ccccb731c953
children
line wrap: on
line diff
--- a/COBRAxy/ras_generator.py	Mon Sep 08 13:52:58 2025 +0000
+++ b/COBRAxy/ras_generator.py	Mon Sep 08 14:44:15 2025 +0000
@@ -27,10 +27,15 @@
         usage = '%(prog)s [options]',
         description = "process some value's genes to create a comparison's map.")
     
-    parser.add_argument("-rl", "--model_upload", type = str,
-        help = "path to input file containing the rules")
+    parser.add_argument(
+        '-rs', '--rules_selector', 
+        type = utils.Model, default = utils.Model.ENGRO2, choices = list(utils.Model),
+        help = 'chose which type of dataset you want use')
+    
+    parser.add_argument("-rl", "--rule_list", type = str,
+        help = "path to input file with custom rules, if provided")
 
-    parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
+    parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
     # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
     
     parser.add_argument(
@@ -518,8 +523,8 @@
     """
     ras_values_by_cell_line = {}
     dataset.set_index(dataset.columns[0], inplace=True)
-    
-    for cell_line_name in dataset.columns: #[1:]:
+    # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
+    for cell_line_name in dataset.columns[1:]:
         cell_line = dataset[cell_line_name].to_dict()
         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
     return ras_values_by_cell_line
@@ -637,50 +642,16 @@
     Returns:
         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
     """
-    datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in galaxy as a .dat
-
-    #try: filenamePath = utils.FilePath.fromStrPath(ARGS.model_upload_name) # file's name in input, to determine its original ext
-    #except utils.PathErr as err:      
-    #    utils.logWarning(f"Cannot determine file extension from filename '{ARGS.model_upload_name}'. Assuming tabular format.", ARGS.out_log)
-    #    filenamePath = None
+    datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
+    
+    try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
+    except utils.PathErr as err:
+        raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
      
-    #if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
-
-    dict_rule = {}
+    if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
 
-    try:
-        # Proviamo prima con delimitatore tab
-        for line in utils.readCsv(datFilePath, delimiter = "\t"):
-            if len(line) < 3:  # Controlliamo che ci siano almeno 3 colonne
-                utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
-                continue
-            
-            if line[2] == "":
-                dict_rule[line[0]] = ruleUtils.OpList([""])
-            else:
-                dict_rule[line[0]] = ruleUtils.parseRuleToNestedList(line[2])
-                
-    except Exception as e:
-        # Se fallisce con tab, proviamo con virgola
-        try:
-            dict_rule = {}
-            for line in utils.readCsv(datFilePath, delimiter = ","):
-                if len(line) < 3:
-                    utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
-                    continue
-                
-                if line[2] == "":
-                    dict_rule[line[0]] = ruleUtils.OpList([""])
-                else:
-                    dict_rule[line[0]] = ruleUtils.parseRuleToNestedList(line[2])
-        except Exception as e2:
-            raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
-
-    if not dict_rule:
-            raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
-    return dict_rule
-
+    return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
 
 def main(args:List[str] = None) -> None:
     """
@@ -700,46 +671,35 @@
     # remove versioning from gene names
     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
 
-    rules = load_custom_rules()
-    reactions = list(rules.keys())
+    # handle custom models
+    model :utils.Model = ARGS.rules_selector
 
-    save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
-    if ERRORS: utils.logWarning(
-        f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
-        ARGS.out_log)  
-
-
-    ############
+    if model is utils.Model.Custom:
+        rules = load_custom_rules()
+        reactions = list(rules.keys())
 
-    # handle custom models
-    #model :utils.Model = ARGS.rules_selector
-
-    #if model is utils.Model.Custom:
-    #    rules = load_custom_rules()
-    #    reactions = list(rules.keys())
-
-    #    save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
-    #    if ERRORS: utils.logWarning(
-    #        f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
-    #        ARGS.out_log)
+        save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
+        if ERRORS: utils.logWarning(
+            f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
+            ARGS.out_log)
         
-    #    return
+        return
     
     # This is the standard flow of the ras_generator program, for non-custom models.
-    #name = "RAS Dataset"
-    #type_gene = gene_type(dataset.iloc[0, 0], name)
-
-    #rules      = model.getRules(ARGS.tool_dir)
-    #genes      = data_gene(dataset, type_gene, name, None)
-    #ids, rules = load_id_rules(rules.get(type_gene))
+    name = "RAS Dataset"
+    type_gene = gene_type(dataset.iloc[0, 0], name)
 
-    #resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
-    #create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
+    rules      = model.getRules(ARGS.tool_dir)
+    genes      = data_gene(dataset, type_gene, name, None)
+    ids, rules = load_id_rules(rules.get(type_gene))
     
-    #if err: utils.logWarning(
-    #    f"Warning: gene(s) {err} not found in class \"{name}\", " +
-    #    "the expression level for this gene will be considered NaN",
-    #    ARGS.out_log)
+    resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
+    create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
+    
+    if err: utils.logWarning(
+        f"Warning: gene(s) {err} not found in class \"{name}\", " +
+        "the expression level for this gene will be considered NaN",
+        ARGS.out_log)
     
     print("Execution succeded")