Mercurial > repos > bimib > cobraxy
comparison COBRAxy/rps_generator.py @ 93:7e703e546998 draft
Uploaded
author | luca_milaz |
---|---|
date | Sun, 13 Oct 2024 11:41:34 +0000 |
parents | 41f35c2f0c7b |
children | 3fca9b568faf |
comparison
equal
deleted
inserted
replaced
92:fdf713bb5772 | 93:7e703e546998 |
---|---|
1 import re | |
2 import sys | |
3 import csv | |
4 import math | |
5 import argparse | |
6 | |
7 import numpy as np | |
8 import pickle as pk | |
9 import pandas as pd | |
10 | |
11 from enum import Enum | |
12 from typing import Optional, List, Dict, Tuple | |
13 | |
14 import utils.general_utils as utils | |
15 import utils.reaction_parsing as reactionUtils | |
16 | |
17 ########################## argparse ########################################## | |
18 ARGS :argparse.Namespace | |
19 def process_args() -> argparse.Namespace: | |
20 """ | |
21 Processes command-line arguments. | |
22 | |
23 Args: | |
24 args (list): List of command-line arguments. | |
25 | |
26 Returns: | |
27 Namespace: An object containing parsed arguments. | |
28 """ | |
29 parser = argparse.ArgumentParser(usage = '%(prog)s [options]', | |
30 description = 'process some value\'s'+ | |
31 ' abundances and reactions to create RPS scores.') | |
32 parser.add_argument('-rc', '--reaction_choice', | |
33 type = str, | |
34 default = 'default', | |
35 choices = ['default','custom'], | |
36 help = 'chose which type of reaction dataset you want use') | |
37 parser.add_argument('-cm', '--custom', | |
38 type = str, | |
39 help='your dataset if you want custom reactions') | |
40 parser.add_argument('-td', '--tool_dir', | |
41 type = str, | |
42 required = True, | |
43 help = 'your tool directory') | |
44 parser.add_argument('-ol', '--out_log', | |
45 help = "Output log") | |
46 parser.add_argument('-id', '--input', | |
47 type = str, | |
48 help = 'input dataset') | |
49 parser.add_argument('-rp', '--rps_output', | |
50 type = str, | |
51 required = True, | |
52 help = 'rps output') | |
53 | |
54 args = parser.parse_args() | |
55 return args | |
56 | |
57 ############################ dataset name ##################################### | |
58 def name_dataset(name_data :str, count :int) -> str: | |
59 """ | |
60 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
61 | |
62 Args: | |
63 name_data : name associated with the dataset (from frontend input params) | |
64 count : counter from 1 to make these names unique (external) | |
65 | |
66 Returns: | |
67 str : the name made unique | |
68 """ | |
69 if str(name_data) == 'Dataset': | |
70 return str(name_data) + '_' + str(count) | |
71 else: | |
72 return str(name_data) | |
73 | |
74 | |
75 ############################ get_abund_data #################################### | |
76 def get_abund_data(dataset: pd.DataFrame, cell_line_index:int) -> Optional[pd.Series]: | |
77 """ | |
78 Extracts abundance data and turns it into a series for a specific cell line from the dataset, which rows are | |
79 metabolites and columns are cell lines. | |
80 | |
81 Args: | |
82 dataset (pandas.DataFrame): The DataFrame containing abundance data for all cell lines and metabolites. | |
83 cell_line_index (int): The index of the cell line of interest in the dataset. | |
84 | |
85 Returns: | |
86 pd.Series or None: A series containing abundance values for the specified cell line. | |
87 The name of the series is the name of the cell line. | |
88 Returns None if the cell index is invalid. | |
89 """ | |
90 if cell_line_index < 0 or cell_line_index >= len(dataset.index): | |
91 print(f"Errore: This cell line index: '{cell_line_index}' is not valid.") | |
92 return None | |
93 | |
94 cell_line_name = dataset.columns[cell_line_index] | |
95 abundances_series = dataset[cell_line_name][1:] | |
96 | |
97 return abundances_series | |
98 | |
99 | |
100 ############################ clean_metabolite_name #################################### | |
101 def clean_metabolite_name(name :str) -> str: | |
102 """ | |
103 Removes some characters from a metabolite's name, provided as input, and makes it lowercase in order to simplify | |
104 the search of a match in the dictionary of synonyms. | |
105 | |
106 Args: | |
107 name : the metabolite's name, as given in the dataset. | |
108 | |
109 Returns: | |
110 str : a new string with the cleaned name. | |
111 """ | |
112 return "".join(ch for ch in name if ch not in ",;-_'([{ }])").lower() | |
113 | |
114 | |
115 ############################ get_metabolite_id #################################### | |
116 def get_metabolite_id(name :str, syn_dict :Dict[str, List[str]]) -> str: | |
117 """ | |
118 Looks through a dictionary of synonyms to find a match for a given metabolite's name. | |
119 | |
120 Args: | |
121 name : the metabolite's name, as given in the dataset. | |
122 syn_dict : the dictionary of synonyms, using unique identifiers as keys and lists of clean synonyms as values. | |
123 | |
124 Returns: | |
125 str : the internal :str unique identifier of that metabolite, used in all other parts of the model in use. | |
126 An empty string is returned if a match isn't found. | |
127 """ | |
128 name = clean_metabolite_name(name) | |
129 for id, synonyms in syn_dict.items(): | |
130 if name in synonyms: return id | |
131 | |
132 return "" | |
133 | |
134 ############################ check_missing_metab #################################### | |
135 def check_missing_metab(reactions: Dict[str, Dict[str, int]], dataset_by_rows: Dict[str, List[float]], cell_lines_amt :int) -> List[str]: | |
136 """ | |
137 Check for missing metabolites in the abundances dictionary compared to the reactions dictionary and update abundances accordingly. | |
138 | |
139 Parameters: | |
140 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | |
141 dataset_by_rows (dict): A dictionary representing abundances where keys are metabolite names and values are their corresponding abundances for all cell lines. | |
142 cell_lines_amt : amount of cell lines, needed to add a new list of abundances for missing metabolites. | |
143 | |
144 Returns: | |
145 list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. | |
146 | |
147 Side effects: | |
148 dataset_by_rows : mut | |
149 """ | |
150 missing_list = [] | |
151 for reaction in reactions.values(): | |
152 for metabolite in reaction.keys(): | |
153 if metabolite not in dataset_by_rows: | |
154 dataset_by_rows[metabolite] = [1] * cell_lines_amt | |
155 missing_list.append(metabolite) | |
156 | |
157 return missing_list | |
158 | |
159 ############################ calculate_rps #################################### | |
160 def calculate_rps(reactions: Dict[str, Dict[str, int]], abundances: Dict[str, float], black_list: List[str], missing_list: List[str]) -> Dict[str, float]: | |
161 """ | |
162 Calculate the Reaction Propensity scores (RPS) based on the availability of reaction substrates, for (ideally) each input model reaction and for each sample. | |
163 The score is computed as the product of the concentrations of the reacting substances, with each concentration raised to a power equal to its stoichiometric coefficient | |
164 for each reaction using the provided coefficient and abundance values. | |
165 | |
166 Parameters: | |
167 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | |
168 abundances (dict): A dictionary representing metabolite abundances where keys are metabolite names and values are their corresponding abundances. | |
169 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | |
170 missing_list (list): A list containing metabolite names that were missing in the original abundances dictionary and thus their values were set to 1. | |
171 | |
172 Returns: | |
173 dict: A dictionary containing Reaction Propensity Scores (RPS) where keys are reaction names and values are the corresponding RPS scores. | |
174 """ | |
175 rps_scores = {} | |
176 | |
177 for reaction_name, substrates in reactions.items(): | |
178 total_contribution = 1 | |
179 metab_significant = False | |
180 for metabolite, stoichiometry in substrates.items(): | |
181 temp = 1 if math.isnan(abundances[metabolite]) else abundances[metabolite] | |
182 if metabolite not in black_list and metabolite not in missing_list: | |
183 metab_significant = True | |
184 total_contribution *= temp ** stoichiometry | |
185 | |
186 rps_scores[reaction_name] = total_contribution if metab_significant else math.nan | |
187 | |
188 return rps_scores | |
189 | |
190 | |
191 ############################ rps_for_cell_lines #################################### | |
192 def rps_for_cell_lines(dataset: List[List[str]], reactions: Dict[str, Dict[str, int]], black_list: List[str], syn_dict: Dict[str, List[str]]) -> None: | |
193 """ | |
194 Calculate Reaction Propensity Scores (RPS) for each cell line represented in the dataframe and creates an output file. | |
195 | |
196 Parameters: | |
197 dataset : the dataset's data, by rows | |
198 reactions (dict): A dictionary representing reactions where keys are reaction names and values are dictionaries containing metabolite names as keys and stoichiometric coefficients as values. | |
199 black_list (list): A list containing metabolite names that should be excluded from the RPS calculation. | |
200 syn_dict (dict): A dictionary where keys are general metabolite names and values are lists of possible synonyms. | |
201 | |
202 Returns: | |
203 None | |
204 """ | |
205 cell_lines = dataset[0][1:] | |
206 abundances_dict = {} | |
207 | |
208 translationIsApplied = ARGS.reaction_choice == "default" | |
209 for row in dataset[1:]: | |
210 id = get_metabolite_id(row[0], syn_dict) if translationIsApplied else row[0] | |
211 if id: abundances_dict[id] = list(map(utils.Float(), row[1:])) | |
212 | |
213 missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) | |
214 | |
215 rps_scores :Dict[Dict[str, float]] = {} | |
216 for pos, cell_line_name in enumerate(cell_lines): | |
217 abundances = { metab : abundances[pos] for metab, abundances in abundances_dict.items() } | |
218 rps_scores[cell_line_name] = calculate_rps(reactions, abundances, black_list, missing_list) | |
219 | |
220 df = pd.DataFrame.from_dict(rps_scores) | |
221 df.rename(columns={'Unnamed: 0': 'Reactions'}, inplace=True) | |
222 df.to_csv(ARGS.rps_output, sep = '\t', na_rep = "None", index = False) | |
223 | |
224 ############################ main #################################### | |
225 def main() -> None: | |
226 """ | |
227 Initializes everything and sets the program in motion based on the fronted input arguments. | |
228 | |
229 Returns: | |
230 None | |
231 """ | |
232 global ARGS | |
233 ARGS = process_args() | |
234 | |
235 # TODO:use utils functions vvv | |
236 with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: | |
237 black_list = pk.load(bl) | |
238 | |
239 with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: | |
240 syn_dict = pk.load(sd) | |
241 | |
242 dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False) | |
243 | |
244 if ARGS.reaction_choice == 'default': | |
245 reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb')) | |
246 | |
247 elif ARGS.reaction_choice == 'custom': | |
248 reactions = reactionUtils.parse_custom_reactions(ARGS.custom) | |
249 | |
250 rps_for_cell_lines(dataset, reactions, black_list, syn_dict) | |
251 print('Execution succeded') | |
252 | |
253 ############################################################################## | |
254 if __name__ == "__main__": | |
255 main() |