Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 143:507efdc9d226 draft
Uploaded
author | luca_milaz |
---|---|
date | Tue, 05 Nov 2024 21:42:17 +0000 |
parents | 41f35c2f0c7b |
children | a9a490ae198d |
comparison
equal
deleted
inserted
replaced
142:accda943dfb9 | 143:507efdc9d226 |
---|---|
13 from PIL import Image | 13 from PIL import Image |
14 import os | 14 import os |
15 import argparse | 15 import argparse |
16 import pyvips | 16 import pyvips |
17 from typing import Tuple, Union, Optional, List, Dict | 17 from typing import Tuple, Union, Optional, List, Dict |
18 import copy | |
18 | 19 |
19 ERRORS = [] | 20 ERRORS = [] |
20 ########################## argparse ########################################## | 21 ########################## argparse ########################################## |
21 ARGS :argparse.Namespace | 22 ARGS :argparse.Namespace |
22 def process_args() -> argparse.Namespace: | 23 def process_args() -> argparse.Namespace: |
759 | 760 |
760 except (TypeError, ZeroDivisionError): continue | 761 except (TypeError, ZeroDivisionError): continue |
761 | 762 |
762 return tmp, max_z_score | 763 return tmp, max_z_score |
763 | 764 |
764 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None: | 765 def computeEnrichment(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
765 """ | 766 """ |
766 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 767 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
767 provided metabolic map. | 768 provided metabolic map. |
768 | 769 |
769 Args: | 770 Args: |
771 class_pat : the clustered data. | 772 class_pat : the clustered data. |
772 ids : ids for data association. | 773 ids : ids for data association. |
773 fromRAS : whether the data to enrich consists of RAS scores. | 774 fromRAS : whether the data to enrich consists of RAS scores. |
774 | 775 |
775 Returns: | 776 Returns: |
776 None | 777 List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score. |
777 | 778 |
778 Raises: | 779 Raises: |
779 sys.exit : if there are less than 2 classes for comparison | 780 sys.exit : if there are less than 2 classes for comparison |
780 | 781 |
781 Side effects: | 782 Side effects: |
782 metabMap : mut | 783 metabMap : mutates based on calculated enrichment |
783 ids : mut | 784 """ |
784 """ | 785 class_pat = {k.strip(): v for k, v in class_pat.items()} |
785 class_pat = { k.strip() : v for k, v in class_pat.items() } | 786 if (not class_pat) or (len(class_pat.keys()) < 2): |
786 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error) | 787 sys.exit('Execution aborted: classes provided for comparisons are less than two\n') |
787 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n') | 788 |
789 enrichment_results = [] | |
788 | 790 |
789 if ARGS.comparison == "manyvsmany": | 791 if ARGS.comparison == "manyvsmany": |
790 for i, j in it.combinations(class_pat.keys(), 2): | 792 for i, j in it.combinations(class_pat.keys(), 2): |
791 #TODO: these 2 functions are always called in pair and in this order and need common data, | |
792 # some clever refactoring would be appreciated. | |
793 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) | 793 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) |
794 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS) | 794 enrichment_results.append((i, j, comparisonDict, max_z_score)) |
795 | 795 |
796 elif ARGS.comparison == "onevsrest": | 796 elif ARGS.comparison == "onevsrest": |
797 for single_cluster in class_pat.keys(): | 797 for single_cluster in class_pat.keys(): |
798 t :List[List[List[float]]] = [] | 798 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v] |
799 for k in class_pat.keys(): | |
800 if k != single_cluster: | |
801 t.append(class_pat.get(k)) | |
802 | |
803 rest :List[List[float]] = [] | |
804 for i in t: | |
805 rest = rest + i | |
806 | |
807 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) | 799 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) |
808 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS) | 800 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score)) |
809 | 801 |
810 elif ARGS.comparison == "onevsmany": | 802 elif ARGS.comparison == "onevsmany": |
811 controlItems = class_pat.get(ARGS.control) | 803 controlItems = class_pat.get(ARGS.control) |
812 for otherDataset in class_pat.keys(): | 804 for otherDataset in class_pat.keys(): |
813 if otherDataset == ARGS.control: continue | 805 if otherDataset == ARGS.control: |
814 | 806 continue |
815 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) | 807 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) |
816 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS) | 808 enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score)) |
817 | 809 |
818 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None: | 810 return enrichment_results |
819 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG) | 811 |
812 def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None: | |
813 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG) | |
820 utils.writeSvg(svgFilePath, core_map) | 814 utils.writeSvg(svgFilePath, core_map) |
821 | 815 |
822 if ARGS.generate_pdf: | 816 if ARGS.generate_pdf: |
823 pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG) | 817 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
824 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF) | 818 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
825 convert_to_pdf(svgFilePath, pngPath, pdfPath) | 819 convert_to_pdf(svgFilePath, pngPath, pdfPath) |
826 | 820 |
827 if not ARGS.generate_svg: os.remove(svgFilePath.show()) | 821 if not ARGS.generate_svg: |
822 os.remove(svgFilePath) | |
828 | 823 |
829 ClassPat = Dict[str, List[List[float]]] | 824 ClassPat = Dict[str, List[List[float]]] |
830 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 825 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
831 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 826 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
832 # for the sake of everyone's sanity. | 827 # for the sake of everyone's sanity. |
878 None | 873 None |
879 | 874 |
880 Raises: | 875 Raises: |
881 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 876 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
882 """ | 877 """ |
883 | |
884 global ARGS | 878 global ARGS |
885 ARGS = process_args() | 879 ARGS = process_args() |
886 | 880 |
887 if os.path.isdir('result') == False: os.makedirs('result') | 881 if not os.path.isdir('result'): |
888 | 882 os.makedirs('result') |
889 core_map :ET.ElementTree = ARGS.choice_map.getMap( | 883 |
884 core_map: ET.ElementTree = ARGS.choice_map.getMap( | |
890 ARGS.tool_dir, | 885 ARGS.tool_dir, |
891 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 886 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
892 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given. | 887 |
893 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner | |
894 # solution can be derived from my comment in FilePath.fromStrPath | |
895 | |
896 if ARGS.using_RAS: | 888 if ARGS.using_RAS: |
897 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) | 889 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) |
898 computeEnrichment(core_map, class_pat, ids) | 890 enrichment_results = computeEnrichment(core_map, class_pat, ids) |
891 for i, j, comparisonDict, max_z_score in enrichment_results: | |
892 map_copy = copy.deepcopy(core_map) | |
893 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=True) | |
894 createOutputMaps(i, j, map_copy) | |
899 | 895 |
900 if ARGS.using_RPS: | 896 if ARGS.using_RPS: |
901 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 897 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
902 computeEnrichment(core_map, class_pat, ids, fromRAS = False) | 898 enrichment_results = computeEnrichment(core_map, class_pat, ids, fromRAS=False) |
903 | 899 for i, j, comparisonDict, max_z_score in enrichment_results: |
904 # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this | 900 map_copy = copy.deepcopy(core_map) |
905 if ARGS.comparison == "manyvsmany": | 901 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=False) |
906 for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map) | 902 createOutputMaps(i, j, map_copy) |
907 return | 903 |
908 | 904 print('Execution succeeded') |
909 if ARGS.comparison == "onevsrest": | |
910 for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map) | |
911 return | |
912 | |
913 for otherDataset in class_pat.keys(): | |
914 if otherDataset != ARGS.control: createOutputMaps(i, j, core_map) | |
915 | |
916 if not ERRORS: return | |
917 utils.logWarning( | |
918 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}", | |
919 ARGS.out_log) | |
920 | |
921 print('Execution succeded') | |
922 | |
923 ############################################################################### | 905 ############################################################################### |
924 if __name__ == "__main__": | 906 if __name__ == "__main__": |
925 main() | 907 main() |