Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/CBS_backend.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 0a3ca20848f3 |
children |
comparison
equal
deleted
inserted
replaced
455:4e2bc80764b6 | 456:a6e45049c1b9 |
---|---|
1 """ | |
2 CBS backend utilities using GLPK for constraint-based sampling. | |
3 | |
4 This module builds and solves LP problems from COBRA models, supports random | |
5 objective function generation (CBS), and provides both swiglpk-based and | |
6 COBRApy-based sampling fallbacks. | |
7 """ | |
1 from swiglpk import * | 8 from swiglpk import * |
2 import random | 9 import random |
3 import pandas as pd | 10 import pandas as pd |
4 import numpy as np | 11 import numpy as np |
5 import cobra as cb | 12 import cobra as cb |
6 | 13 |
7 # Initialize LP problem | 14 # Initialize LP problem |
8 def initialize_lp_problem(S): | 15 def initialize_lp_problem(S): |
16 """ | |
17 Prepare sparse matrix structures for GLPK given a stoichiometric matrix. | |
18 | |
19 Args: | |
20 S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix. | |
21 | |
22 Returns: | |
23 tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol) | |
24 - len_vector: number of non-zero entries | |
25 - values: list of non-zero values | |
26 - indexes: list of (row, col) indices for non-zero entries | |
27 - ia, ja, ar: GLPK-ready arrays for the sparse matrix | |
28 - nrows, ncol: matrix shape | |
29 """ | |
9 | 30 |
10 len_vector=len(S.keys()) | 31 len_vector=len(S.keys()) |
11 values=list(S.values()) | 32 values=list(S.values()) |
12 indexes=list(S.keys()) | 33 indexes=list(S.keys()) |
13 ia = intArray(len_vector+1); | 34 ia = intArray(len_vector+1); |
27 | 48 |
28 return len_vector, values, indexes, ia, ja, ar, nrows, ncol | 49 return len_vector, values, indexes, ia, ja, ar, nrows, ncol |
29 | 50 |
30 | 51 |
31 | 52 |
32 # Solve LP problem from the structure of the metabolic model | |
33 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar, | 53 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar, |
34 obj_coefs,reactions,return_lp=False): | 54 obj_coefs,reactions,return_lp=False): |
55 """ | |
56 Create and solve a GLPK LP problem for a metabolic model. | |
57 | |
58 Args: | |
59 lb, ub: Lower/upper bounds per reaction (lists of floats). | |
60 nrows, ncol: Dimensions of the S matrix. | |
61 len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK. | |
62 obj_coefs: Objective function coefficients (list of floats). | |
63 reactions: Reaction identifiers (list of str), used for output mapping. | |
64 return_lp: If True, also return the GLPK problem object (caller must delete). | |
65 | |
66 Returns: | |
67 tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True. | |
68 """ | |
35 | 69 |
36 | 70 |
37 lp = glp_create_prob(); | 71 lp = glp_create_prob(); |
38 glp_set_prob_name(lp, "sample"); | 72 glp_set_prob_name(lp, "sample"); |
39 glp_set_obj_dir(lp, GLP_MAX); | 73 glp_set_obj_dir(lp, GLP_MAX); |
58 except Exception as e: | 92 except Exception as e: |
59 glp_delete_prob(lp) | 93 glp_delete_prob(lp) |
60 raise Exception(e) | 94 raise Exception(e) |
61 | 95 |
62 | 96 |
63 # Solve LP problem from the structure of the metabolic model | |
64 def solve_lp_problem(lp,obj_coefs,reactions): | 97 def solve_lp_problem(lp,obj_coefs,reactions): |
98 """ | |
99 Configure and solve an LP with GLPK using provided objective coefficients. | |
100 | |
101 Args: | |
102 lp: GLPK problem handle. | |
103 obj_coefs: Objective coefficients. | |
104 reactions: Reaction identifiers (unused in computation; length used for output). | |
105 | |
106 Returns: | |
107 tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value. | |
108 """ | |
65 | 109 |
66 # Set the coefficients of the objective function | 110 # Set the coefficients of the objective function |
67 i=1 | 111 i=1 |
68 for ind_coef in obj_coefs: | 112 for ind_coef in obj_coefs: |
69 glp_set_obj_coef(lp, i, ind_coef); | 113 glp_set_obj_coef(lp, i, ind_coef); |
99 raise Exception(e) | 143 raise Exception(e) |
100 finally: | 144 finally: |
101 # Re-enable terminal output after solving | 145 # Re-enable terminal output after solving |
102 glp_term_out(GLP_ON) | 146 glp_term_out(GLP_ON) |
103 | 147 |
104 # Create LP structure | |
105 def create_lp_structure(model): | 148 def create_lp_structure(model): |
149 """ | |
150 Extract the LP structure (S matrix, bounds, objective) from a COBRA model. | |
151 | |
152 Args: | |
153 model (cobra.Model): The COBRA model. | |
154 | |
155 Returns: | |
156 tuple: (S, lb, ub, coefs_obj, reactions) | |
157 """ | |
106 | 158 |
107 reactions=[el.id for el in model.reactions] | 159 reactions=[el.id for el in model.reactions] |
108 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions] | 160 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions] |
109 | 161 |
110 # Lower and upper bounds | 162 # Lower and upper bounds |
114 # Create S matrix | 166 # Create S matrix |
115 S=cb.util.create_stoichiometric_matrix(model,array_type="dok") | 167 S=cb.util.create_stoichiometric_matrix(model,array_type="dok") |
116 | 168 |
117 return S,lb,ub,coefs_obj,reactions | 169 return S,lb,ub,coefs_obj,reactions |
118 | 170 |
119 # CBS sampling interface | |
120 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample): | 171 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample): |
172 """ | |
173 Sample fluxes using GLPK by iterating over random objective functions. | |
174 | |
175 Args: | |
176 model: COBRA model. | |
177 nsample: Number of samples to generate. | |
178 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). | |
179 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). | |
180 | |
181 Returns: | |
182 None | |
183 """ | |
121 | 184 |
122 S,lb,ub,coefs_obj,reactions = create_lp_structure(model) | 185 S,lb,ub,coefs_obj,reactions = create_lp_structure(model) |
123 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S) | 186 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S) |
124 | 187 |
125 for i in range(nsample): | 188 for i in range(nsample): |
126 | 189 |
127 coefs_obj=coefficients_df.iloc[:,i].values | 190 coefs_obj=coefficients_df.iloc[:,i].values |
128 | 191 |
129 if coefs_obj[-1]==1: #minimize | 192 if coefs_obj[-1]==1: # minimize |
130 coefs_obj= coefs_obj[0:-1] * -1 | 193 coefs_obj= coefs_obj[0:-1] * -1 |
131 else: | 194 else: |
132 coefs_obj=coefs_obj[0:-1] | 195 coefs_obj=coefs_obj[0:-1] |
133 | 196 |
134 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector, | 197 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector, |
135 ia, ja, ar, coefs_obj,reactions,return_lp=False) | 198 ia, ja, ar, coefs_obj,reactions,return_lp=False) |
136 df_sample.loc[i] = fluxes | 199 df_sample.loc[i] = fluxes |
137 pass | 200 return |
138 | 201 |
139 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample): | 202 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample): |
203 """ | |
204 Fallback sampling using COBRApy's optimize with per-sample randomized objectives. | |
205 | |
206 Args: | |
207 model: COBRA model. | |
208 nsample: Number of samples to generate. | |
209 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). | |
210 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). | |
211 | |
212 Returns: | |
213 None | |
214 """ | |
140 | 215 |
141 for i in range(nsample): | 216 for i in range(nsample): |
142 | 217 |
143 dict_coeff={} | 218 dict_coeff={} |
144 if(coefficients_df.iloc[-1][i]==1): | 219 if(coefficients_df.iloc[-1][i]==1): |
145 type_problem = -1 #minimize | 220 type_problem = -1 # minimize |
146 else: | 221 else: |
147 type_problem = 1 | 222 type_problem = 1 # maximize |
148 | 223 |
149 for rxn in [reaction.id for reaction in model.reactions]: | 224 for rxn in [reaction.id for reaction in model.reactions]: |
150 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem | 225 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem |
151 | 226 |
152 model.objective = dict_coeff | 227 model.objective = dict_coeff |
153 solution = model.optimize().fluxes | 228 solution = model.optimize().fluxes |
154 for rxn, flux in solution.items(): | 229 for rxn, flux in solution.items(): |
155 df_sample.loc[i][rxn] = flux | 230 df_sample.loc[i][rxn] = flux |
156 | 231 |
157 pass | 232 return |
158 | 233 |
159 # Create random coefficients for CBS | |
160 def randomObjectiveFunction(model, n_samples, df_fva, seed=0): | 234 def randomObjectiveFunction(model, n_samples, df_fva, seed=0): |
161 | 235 """ |
162 | 236 Create random objective function coefficients for CBS sampling. |
163 #reactions = model.reactions | 237 |
164 reactions = [reaction.id for reaction in model.reactions] | 238 The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize. |
165 cont=seed | 239 |
166 list_ex=reactions.copy() | 240 Args: |
167 list_ex.append("type_of_problem") | 241 model: COBRA model. |
168 coefficients_df = pd.DataFrame(index=list_ex,columns=[str(i) for i in range(n_samples)]) | 242 n_samples: Number of random objectives to generate. |
169 | 243 df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction. |
170 for i in range(0, n_samples): | 244 seed: Seed for reproducibility. |
171 | 245 |
172 cont=cont+1 | 246 Returns: |
247 pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'. | |
248 """ | |
249 # reactions = model.reactions | |
250 reactions = [reaction.id for reaction in model.reactions] | |
251 cont = seed | |
252 list_ex = reactions.copy() | |
253 list_ex.append("type_of_problem") | |
254 coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)]) | |
255 | |
256 for i in range(0, n_samples): | |
257 | |
258 cont = cont + 1 | |
259 random.seed(cont) | |
260 | |
261 # Generate a random threshold in [0, 1] | |
262 threshold = random.random() | |
263 | |
264 for reaction in reactions: | |
265 | |
266 cont = cont + 1 | |
173 random.seed(cont) | 267 random.seed(cont) |
174 | 268 |
175 # Genera un numero casuale tra 0 e 1 | 269 val = random.random() |
176 threshold = random.random() #coefficiente tra 0 e 1 | 270 |
177 | 271 if val > threshold: |
178 for reaction in reactions: | 272 |
179 | 273 cont = cont + 1 |
180 cont=cont+1 | |
181 random.seed(cont) | 274 random.seed(cont) |
182 | 275 |
183 val=random.random() | 276 # Coefficient in [-1, 1] |
184 | 277 c = 2 * random.random() - 1 |
185 if val>threshold: | 278 |
186 | 279 val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])]) |
187 cont=cont+1 | 280 |
188 random.seed(cont) | 281 if val_max != 0: # only if FVA is non-zero |
189 | 282 coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA |
190 c=2*random.random()-1 #coefficiente tra -1 e 1 | |
191 | |
192 val_max = np.max([abs(df_fva.loc[reaction,"minimum"]), abs(df_fva.loc[reaction,"maximum"])]) | |
193 | |
194 if val_max!=0: #solo se la fva รจ diversa da zero | |
195 coefficients_df.loc[reaction,str(i)] = c/val_max #divido per la fva | |
196 else: | |
197 coefficients_df.loc[reaction,str(i)] = 0 | |
198 | |
199 else: | 283 else: |
200 coefficients_df.loc[reaction,str(i)] = 0 | 284 coefficients_df.loc[reaction, str(i)] = 0 |
201 | 285 |
202 cont=cont+1 | |
203 random.seed(cont) | |
204 | |
205 if random.random()<0.5: | |
206 coefficients_df.loc["type_of_problem",str(i)] = 0 #maximize | |
207 else: | 286 else: |
208 coefficients_df.loc["type_of_problem",str(i)] = 1 #minimize | 287 coefficients_df.loc[reaction, str(i)] = 0 |
209 | 288 |
210 return coefficients_df | 289 cont = cont + 1 |
290 random.seed(cont) | |
291 | |
292 if random.random() < 0.5: | |
293 coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize | |
294 else: | |
295 coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize | |
296 | |
297 return coefficients_df |