diff COBRAxy/rps_generator.py @ 489:97eea560a10f draft

Uploaded
author francesco_lapi
date Mon, 29 Sep 2025 10:33:26 +0000
parents 187cee1a00e2
children
line wrap: on
line diff
--- a/COBRAxy/rps_generator.py	Tue Sep 23 13:48:24 2025 +0000
+++ b/COBRAxy/rps_generator.py	Mon Sep 29 10:33:26 2025 +0000
@@ -1,3 +1,10 @@
+"""
+Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry.
+
+Given a tabular dataset (metabolites x samples) and a reaction set, this script
+maps metabolite names via synonyms, fills missing metabolites, and computes RPS
+per reaction for each sample using a log-normalized formula.
+"""
 import math
 import argparse
 
@@ -22,17 +29,14 @@
     Returns:
         Namespace: An object containing parsed arguments.
     """
-    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
-                                     description = 'process some value\'s'+
-                                     ' abundances and reactions to create RPS scores.')
-    parser.add_argument('-rc', '--reaction_choice', 
-                        type = str,
-                        default = 'default',
-                        choices = ['default','custom'], 
-                        help = 'chose which type of reaction dataset you want use')
-    parser.add_argument('-cm', '--custom',
-                        type = str,
-                        help='your dataset if you want custom reactions')
+    parser = argparse.ArgumentParser(
+        usage='%(prog)s [options]',
+        description='Process abundances and reactions to create RPS scores.'
+    )
+    
+    parser.add_argument("-rl", "--model_upload", type = str,
+        help = "path to input file containing the reactions")
+
     parser.add_argument('-td', '--tool_dir',
                         type = str,
                         required = True,
@@ -57,8 +61,8 @@
     Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
 
     Args:
-        name_data : name associated with the dataset (from frontend input params)
-        count : counter from 1 to make these names unique (external)
+    name_data: Name associated with the dataset (from frontend input params).
+    count: Counter starting at 1 to make names unique when default.
 
     Returns:
         str : the name made unique
@@ -84,7 +88,7 @@
                            Returns None if the cell index is invalid.
     """
     if cell_line_index < 0 or cell_line_index >= len(dataset.index):
-        print(f"Errore: This cell line index: '{cell_line_index}' is not valid.")
+        print(f"Error: cell line index '{cell_line_index}' is not valid.")
         return None
 
     cell_line_name = dataset.columns[cell_line_index]
@@ -121,7 +125,8 @@
     """
     name = clean_metabolite_name(name)
     for id, synonyms in syn_dict.items():
-        if name in synonyms: return id
+        if name in synonyms:
+            return id
     
     return ""
 
@@ -131,7 +136,8 @@
     Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly.
 
     Parameters:
-        reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values.
+        reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and 
+                          stoichiometric coefficients as values.
         dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines.
         cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites.
 
@@ -139,7 +145,7 @@
         list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
 
     Side effects:
-        dataset_by_rows : mut
+        dataset_by_rows: mutated to include missing metabolites with default abundances.
     """
     missing_list = []
     for reaction in reactions.values():
@@ -199,23 +205,27 @@
     Returns:
         None
     """
+
     cell_lines = dataset[0][1:]
     abundances_dict = {}
 
-    translationIsApplied = ARGS.reaction_choice == "default"
     for row in dataset[1:]:
-        id = get_metabolite_id(row[0], syn_dict) if translationIsApplied else row[0]
-        if id: abundances_dict[id] = list(map(utils.Float(), row[1:]))
-    
+        id = get_metabolite_id(row[0], syn_dict)
+        if id:
+            abundances_dict[id] = list(map(utils.Float(), row[1:]))
+
     missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
-    
+
     rps_scores :Dict[Dict[str, float]] = {}
     for pos, cell_line_name in enumerate(cell_lines):
         abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() }
+
         rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list, substrateFreqTable)
     
     df = pd.DataFrame.from_dict(rps_scores)
-    
+    df = df.loc[list(reactions.keys()),:]
+    # Optional preview: first 10 rows
+    # print(df.head(10))
     df.index.name = 'Reactions'
     df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True)
 
@@ -230,29 +240,44 @@
     global ARGS
     ARGS = process_args(args)
 
-    # TODO:use utils functions vvv
+    # Load support data (black list and synonyms)
     with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
         black_list = pk.load(bl)
 
     with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
         syn_dict = pk.load(sd)
 
-    dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False)
+    dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False)
+    tmp_dict = None
 
-    if ARGS.reaction_choice == 'default':
-        reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb'))
-        substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb'))
-    
-    elif ARGS.reaction_choice == 'custom':
-        reactions = reactionUtils.parse_custom_reactions(ARGS.custom)
-        substrateFreqTable = {}
-        for _, substrates in reactions.items():
-            for substrateName, _ in substrates.items():
-                if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
-                substrateFreqTable[substrateName] += 1
+    # Parse custom reactions from uploaded file
+    reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload)
+    for r, s in reactions.items():
+        tmp_list = list(s.keys())
+        for k in tmp_list:
+            if k[-2] == '_':
+                s[k[:-2]] = s.pop(k)
+    substrateFreqTable = {}
+    for _, substrates in reactions.items():
+        for substrateName, _ in substrates.items():
+            if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
+            substrateFreqTable[substrateName] += 1
+
+    # Debug prints (can be enabled during troubleshooting)
+    # print(f"Reactions: {reactions}")
+    # print(f"Substrate Frequencies: {substrateFreqTable}")
+    # print(f"Synonyms: {syn_dict}")
+        tmp_dict = {}
+        for metabName, freq in substrateFreqTable.items():
+            tmp_metabName = clean_metabolite_name(metabName)
+            for syn_key, syn_list in syn_dict.items():
+                if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key):
+                    # print(f"Mapping {tmp_metabName} to {syn_key}")
+                    tmp_dict[syn_key] = syn_list
+                    tmp_dict[syn_key].append(tmp_metabName)
 
     rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable)
-    print('Execution succeded')
+    print('Execution succeeded')
 
 ##############################################################################
 if __name__ == "__main__": main()