changeset 143:507efdc9d226 draft

Uploaded
author luca_milaz
date Tue, 05 Nov 2024 21:42:17 +0000 (2 months ago)
parents accda943dfb9
children a9a490ae198d
files COBRAxy/marea.py
diffstat 1 files changed, 40 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/COBRAxy/marea.py	Thu Oct 31 20:54:58 2024 +0000
+++ b/COBRAxy/marea.py	Tue Nov 05 21:42:17 2024 +0000
@@ -15,6 +15,7 @@
 import argparse
 import pyvips
 from typing import Tuple, Union, Optional, List, Dict
+import copy
 
 ERRORS = []
 ########################## argparse ##########################################
@@ -761,7 +762,7 @@
     
     return tmp, max_z_score
 
-def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None:
+def computeEnrichment(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]:
     """
     Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
     provided metabolic map.
@@ -773,58 +774,52 @@
         fromRAS : whether the data to enrich consists of RAS scores.
 
     Returns:
-        None
-
+        List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
+        
     Raises:
         sys.exit : if there are less than 2 classes for comparison
     
     Side effects:
-        metabMap : mut
-        ids : mut
+        metabMap : mutates based on calculated enrichment
     """
-    class_pat = { k.strip() : v for k, v in class_pat.items() }
-    #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
-    if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
+    class_pat = {k.strip(): v for k, v in class_pat.items()}
+    if (not class_pat) or (len(class_pat.keys()) < 2):
+        sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
+    
+    enrichment_results = []
 
     if ARGS.comparison == "manyvsmany":
         for i, j in it.combinations(class_pat.keys(), 2):
-            #TODO: these 2 functions are always called in pair and in this order and need common data,
-            # some clever refactoring would be appreciated.
             comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS)
+            enrichment_results.append((i, j, comparisonDict, max_z_score))
     
     elif ARGS.comparison == "onevsrest":
         for single_cluster in class_pat.keys():
-            t :List[List[List[float]]] = []
-            for k in class_pat.keys():
-                if k != single_cluster:
-                   t.append(class_pat.get(k))
-            
-            rest :List[List[float]] = []
-            for i in t:
-                rest = rest + i
-            
+            rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
             comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS)
+            enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
     
     elif ARGS.comparison == "onevsmany":
         controlItems = class_pat.get(ARGS.control)
         for otherDataset in class_pat.keys():
-            if otherDataset == ARGS.control: continue
-            
+            if otherDataset == ARGS.control:
+                continue
             comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS)
+            enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
+    
+    return enrichment_results
 
-def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
-    svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)
+def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None:
+    svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
     utils.writeSvg(svgFilePath, core_map)
 
     if ARGS.generate_pdf:
-        pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG)
-        pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF)
-        convert_to_pdf(svgFilePath, pngPath, pdfPath)                     
+        pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
+        pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
+        convert_to_pdf(svgFilePath, pngPath, pdfPath)
 
-    if not ARGS.generate_svg: os.remove(svgFilePath.show())
+    if not ARGS.generate_svg:
+        os.remove(svgFilePath)
 
 ClassPat = Dict[str, List[List[float]]]
 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
@@ -880,46 +875,33 @@
     Raises:
         sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
     """
-
     global ARGS
     ARGS = process_args()
 
-    if os.path.isdir('result') == False: os.makedirs('result')
+    if not os.path.isdir('result'):
+        os.makedirs('result')
     
-    core_map :ET.ElementTree = ARGS.choice_map.getMap(
+    core_map: ET.ElementTree = ARGS.choice_map.getMap(
         ARGS.tool_dir,
         utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
-    # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
-    # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
-    # solution can be derived from my comment in FilePath.fromStrPath
-
+    
     if ARGS.using_RAS:
         ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
-        computeEnrichment(core_map, class_pat, ids)
+        enrichment_results = computeEnrichment(core_map, class_pat, ids)
+        for i, j, comparisonDict, max_z_score in enrichment_results:
+            map_copy = copy.deepcopy(core_map)
+            temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=True)
+            createOutputMaps(i, j, map_copy)
     
     if ARGS.using_RPS:
         ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
-        computeEnrichment(core_map, class_pat, ids, fromRAS = False)
-    
-    # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this
-    if ARGS.comparison == "manyvsmany":
-        for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map)
-        return
-    
-    if ARGS.comparison == "onevsrest":
-        for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map)
-        return
-    
-    for otherDataset in class_pat.keys():
-        if otherDataset != ARGS.control: createOutputMaps(i, j, core_map)
+        enrichment_results = computeEnrichment(core_map, class_pat, ids, fromRAS=False)
+        for i, j, comparisonDict, max_z_score in enrichment_results:
+            map_copy = copy.deepcopy(core_map)
+            temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=False)
+            createOutputMaps(i, j, map_copy)
 
-    if not ERRORS: return
-    utils.logWarning(
-        f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
-        ARGS.out_log)
-    
-    print('Execution succeded')
-
+    print('Execution succeeded')
 ###############################################################################
 if __name__ == "__main__":
     main()
\ No newline at end of file