comparison COBRAxy/src/utils/CBS_backend.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 """
2 CBS backend utilities using GLPK for constraint-based sampling.
3
4 This module builds and solves LP problems from COBRA models, supports random
5 objective function generation (CBS), and provides both swiglpk-based and
6 COBRApy-based sampling fallbacks.
7 """
8 from swiglpk import *
9 import random
10 import pandas as pd
11 import numpy as np
12 import cobra as cb
13
14 # Initialize LP problem
15 def initialize_lp_problem(S):
16 """
17 Prepare sparse matrix structures for GLPK given a stoichiometric matrix.
18
19 Args:
20 S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix.
21
22 Returns:
23 tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol)
24 - len_vector: number of non-zero entries
25 - values: list of non-zero values
26 - indexes: list of (row, col) indices for non-zero entries
27 - ia, ja, ar: GLPK-ready arrays for the sparse matrix
28 - nrows, ncol: matrix shape
29 """
30
31 len_vector=len(S.keys())
32 values=list(S.values())
33 indexes=list(S.keys())
34 ia = intArray(len_vector+1);
35 ja = intArray(len_vector+1);
36 ar = doubleArray(len_vector+1);
37
38 i=0
39 ind_row=[indexes[i][0]+1 for i in range(0, len(values) )]
40 ind_col=[indexes[i][1]+1 for i in range(0, len(values) )]
41 for i in range(1, len(values) + 1):
42 ia[i]=ind_row[i-1]
43 ja[i]=ind_col[i-1]
44 ar[i] = values[i-1]
45
46 nrows=S.shape[0]
47 ncol=S.shape[1]
48
49 return len_vector, values, indexes, ia, ja, ar, nrows, ncol
50
51
52
53 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar,
54 obj_coefs,reactions,return_lp=False):
55 """
56 Create and solve a GLPK LP problem for a metabolic model.
57
58 Args:
59 lb, ub: Lower/upper bounds per reaction (lists of floats).
60 nrows, ncol: Dimensions of the S matrix.
61 len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK.
62 obj_coefs: Objective function coefficients (list of floats).
63 reactions: Reaction identifiers (list of str), used for output mapping.
64 return_lp: If True, also return the GLPK problem object (caller must delete).
65
66 Returns:
67 tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True.
68 """
69
70
71 lp = glp_create_prob();
72 glp_set_prob_name(lp, "sample");
73 glp_set_obj_dir(lp, GLP_MAX);
74 glp_add_rows(lp, nrows);
75 eps = 1e-16
76 for i in range(nrows):
77 glp_set_row_name(lp, i+1, "constrain_"+str(i+1));
78 glp_set_row_bnds(lp, i+1, GLP_FX, 0.0, 0.0);
79 glp_add_cols(lp, ncol);
80 for i in range(ncol):
81 glp_set_col_name(lp, i+1, "flux_"+str(i+1));
82 glp_set_col_bnds(lp, i+1, GLP_DB,lb[i]-eps,ub[i]+eps);
83 glp_load_matrix(lp, len_vector, ia, ja, ar);
84
85 try:
86 fluxes,Z=solve_lp_problem(lp,obj_coefs,reactions)
87 if return_lp:
88 return fluxes,Z,lp
89 else:
90 glp_delete_prob(lp);
91 return fluxes,Z
92 except Exception as e:
93 glp_delete_prob(lp)
94 raise Exception(e)
95
96
97 def solve_lp_problem(lp,obj_coefs,reactions):
98 """
99 Configure and solve an LP with GLPK using provided objective coefficients.
100
101 Args:
102 lp: GLPK problem handle.
103 obj_coefs: Objective coefficients.
104 reactions: Reaction identifiers (unused in computation; length used for output).
105
106 Returns:
107 tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value.
108 """
109
110 # Set the coefficients of the objective function
111 i=1
112 for ind_coef in obj_coefs:
113 glp_set_obj_coef(lp, i, ind_coef);
114 i+=1
115
116 # Initialize the parameters
117 params=glp_smcp()
118 params.presolve=GLP_ON
119 params.msg_lev = GLP_MSG_ERR
120 params.tm_lim=4000
121 glp_init_smcp(params)
122
123 glp_term_out(GLP_OFF)
124
125 try:
126
127 # Solve the problem
128 glp_scale_prob(lp,GLP_SF_AUTO)
129
130 value=glp_simplex(lp, params)
131
132 Z = glp_get_obj_val(lp);
133
134 if value == 0:
135 fluxes = []
136 for i in range(len(reactions)): fluxes.append(glp_get_col_prim(lp, i+1))
137 return fluxes,Z
138 else:
139 raise Exception("error in LP problem. Problem:",str(value))
140 except Exception as e:
141 # Re-enable terminal output for error reporting
142 glp_term_out(GLP_ON)
143 raise Exception(e)
144 finally:
145 # Re-enable terminal output after solving
146 glp_term_out(GLP_ON)
147
148 def create_lp_structure(model):
149 """
150 Extract the LP structure (S matrix, bounds, objective) from a COBRA model.
151
152 Args:
153 model (cobra.Model): The COBRA model.
154
155 Returns:
156 tuple: (S, lb, ub, coefs_obj, reactions)
157 """
158
159 reactions=[el.id for el in model.reactions]
160 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions]
161
162 # Lower and upper bounds
163 lb=[reaction.lower_bound for reaction in model.reactions]
164 ub=[reaction.upper_bound for reaction in model.reactions]
165
166 # Create S matrix
167 S=cb.util.create_stoichiometric_matrix(model,array_type="dok")
168
169 return S,lb,ub,coefs_obj,reactions
170
171 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample):
172 """
173 Sample fluxes using GLPK by iterating over random objective functions.
174
175 Args:
176 model: COBRA model.
177 nsample: Number of samples to generate.
178 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
179 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
180
181 Returns:
182 None
183 """
184
185 S,lb,ub,coefs_obj,reactions = create_lp_structure(model)
186 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S)
187
188 for i in range(nsample):
189
190 coefs_obj=coefficients_df.iloc[:,i].values
191
192 if coefs_obj[-1]==1: # minimize
193 coefs_obj= coefs_obj[0:-1] * -1
194 else:
195 coefs_obj=coefs_obj[0:-1]
196
197 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector,
198 ia, ja, ar, coefs_obj,reactions,return_lp=False)
199 df_sample.loc[i] = fluxes
200 return
201
202 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample):
203 """
204 Fallback sampling using COBRApy's optimize with per-sample randomized objectives.
205
206 Args:
207 model: COBRA model.
208 nsample: Number of samples to generate.
209 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
210 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
211
212 Returns:
213 None
214 """
215
216 for i in range(nsample):
217
218 dict_coeff={}
219 if(coefficients_df.iloc[-1][i]==1):
220 type_problem = -1 # minimize
221 else:
222 type_problem = 1 # maximize
223
224 for rxn in [reaction.id for reaction in model.reactions]:
225 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem
226
227 model.objective = dict_coeff
228 solution = model.optimize().fluxes
229 for rxn, flux in solution.items():
230 df_sample.loc[i][rxn] = flux
231
232 return
233
234 def randomObjectiveFunction(model, n_samples, df_fva, seed=0):
235 """
236 Create random objective function coefficients for CBS sampling.
237
238 The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize.
239
240 Args:
241 model: COBRA model.
242 n_samples: Number of random objectives to generate.
243 df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction.
244 seed: Seed for reproducibility.
245
246 Returns:
247 pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'.
248 """
249 # reactions = model.reactions
250 reactions = [reaction.id for reaction in model.reactions]
251 cont = seed
252 list_ex = reactions.copy()
253 list_ex.append("type_of_problem")
254 coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)])
255
256 for i in range(0, n_samples):
257
258 cont = cont + 1
259 random.seed(cont)
260
261 # Generate a random threshold in [0, 1]
262 threshold = random.random()
263
264 for reaction in reactions:
265
266 cont = cont + 1
267 random.seed(cont)
268
269 val = random.random()
270
271 if val > threshold:
272
273 cont = cont + 1
274 random.seed(cont)
275
276 # Coefficient in [-1, 1]
277 c = 2 * random.random() - 1
278
279 val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])])
280
281 if val_max != 0: # only if FVA is non-zero
282 coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA
283 else:
284 coefficients_df.loc[reaction, str(i)] = 0
285
286 else:
287 coefficients_df.loc[reaction, str(i)] = 0
288
289 cont = cont + 1
290 random.seed(cont)
291
292 if random.random() < 0.5:
293 coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize
294 else:
295 coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize
296
297 return coefficients_df