Mercurial > repos > bimib > cobraxy
comparison COBRAxy/src/utils/CBS_backend.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 538:fd53d42348bd | 539:2fb97466e404 |
|---|---|
| 1 """ | |
| 2 CBS backend utilities using GLPK for constraint-based sampling. | |
| 3 | |
| 4 This module builds and solves LP problems from COBRA models, supports random | |
| 5 objective function generation (CBS), and provides both swiglpk-based and | |
| 6 COBRApy-based sampling fallbacks. | |
| 7 """ | |
| 8 from swiglpk import * | |
| 9 import random | |
| 10 import pandas as pd | |
| 11 import numpy as np | |
| 12 import cobra as cb | |
| 13 | |
| 14 # Initialize LP problem | |
| 15 def initialize_lp_problem(S): | |
| 16 """ | |
| 17 Prepare sparse matrix structures for GLPK given a stoichiometric matrix. | |
| 18 | |
| 19 Args: | |
| 20 S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix. | |
| 21 | |
| 22 Returns: | |
| 23 tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol) | |
| 24 - len_vector: number of non-zero entries | |
| 25 - values: list of non-zero values | |
| 26 - indexes: list of (row, col) indices for non-zero entries | |
| 27 - ia, ja, ar: GLPK-ready arrays for the sparse matrix | |
| 28 - nrows, ncol: matrix shape | |
| 29 """ | |
| 30 | |
| 31 len_vector=len(S.keys()) | |
| 32 values=list(S.values()) | |
| 33 indexes=list(S.keys()) | |
| 34 ia = intArray(len_vector+1); | |
| 35 ja = intArray(len_vector+1); | |
| 36 ar = doubleArray(len_vector+1); | |
| 37 | |
| 38 i=0 | |
| 39 ind_row=[indexes[i][0]+1 for i in range(0, len(values) )] | |
| 40 ind_col=[indexes[i][1]+1 for i in range(0, len(values) )] | |
| 41 for i in range(1, len(values) + 1): | |
| 42 ia[i]=ind_row[i-1] | |
| 43 ja[i]=ind_col[i-1] | |
| 44 ar[i] = values[i-1] | |
| 45 | |
| 46 nrows=S.shape[0] | |
| 47 ncol=S.shape[1] | |
| 48 | |
| 49 return len_vector, values, indexes, ia, ja, ar, nrows, ncol | |
| 50 | |
| 51 | |
| 52 | |
| 53 def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar, | |
| 54 obj_coefs,reactions,return_lp=False): | |
| 55 """ | |
| 56 Create and solve a GLPK LP problem for a metabolic model. | |
| 57 | |
| 58 Args: | |
| 59 lb, ub: Lower/upper bounds per reaction (lists of floats). | |
| 60 nrows, ncol: Dimensions of the S matrix. | |
| 61 len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK. | |
| 62 obj_coefs: Objective function coefficients (list of floats). | |
| 63 reactions: Reaction identifiers (list of str), used for output mapping. | |
| 64 return_lp: If True, also return the GLPK problem object (caller must delete). | |
| 65 | |
| 66 Returns: | |
| 67 tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True. | |
| 68 """ | |
| 69 | |
| 70 | |
| 71 lp = glp_create_prob(); | |
| 72 glp_set_prob_name(lp, "sample"); | |
| 73 glp_set_obj_dir(lp, GLP_MAX); | |
| 74 glp_add_rows(lp, nrows); | |
| 75 eps = 1e-16 | |
| 76 for i in range(nrows): | |
| 77 glp_set_row_name(lp, i+1, "constrain_"+str(i+1)); | |
| 78 glp_set_row_bnds(lp, i+1, GLP_FX, 0.0, 0.0); | |
| 79 glp_add_cols(lp, ncol); | |
| 80 for i in range(ncol): | |
| 81 glp_set_col_name(lp, i+1, "flux_"+str(i+1)); | |
| 82 glp_set_col_bnds(lp, i+1, GLP_DB,lb[i]-eps,ub[i]+eps); | |
| 83 glp_load_matrix(lp, len_vector, ia, ja, ar); | |
| 84 | |
| 85 try: | |
| 86 fluxes,Z=solve_lp_problem(lp,obj_coefs,reactions) | |
| 87 if return_lp: | |
| 88 return fluxes,Z,lp | |
| 89 else: | |
| 90 glp_delete_prob(lp); | |
| 91 return fluxes,Z | |
| 92 except Exception as e: | |
| 93 glp_delete_prob(lp) | |
| 94 raise Exception(e) | |
| 95 | |
| 96 | |
| 97 def solve_lp_problem(lp,obj_coefs,reactions): | |
| 98 """ | |
| 99 Configure and solve an LP with GLPK using provided objective coefficients. | |
| 100 | |
| 101 Args: | |
| 102 lp: GLPK problem handle. | |
| 103 obj_coefs: Objective coefficients. | |
| 104 reactions: Reaction identifiers (unused in computation; length used for output). | |
| 105 | |
| 106 Returns: | |
| 107 tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value. | |
| 108 """ | |
| 109 | |
| 110 # Set the coefficients of the objective function | |
| 111 i=1 | |
| 112 for ind_coef in obj_coefs: | |
| 113 glp_set_obj_coef(lp, i, ind_coef); | |
| 114 i+=1 | |
| 115 | |
| 116 # Initialize the parameters | |
| 117 params=glp_smcp() | |
| 118 params.presolve=GLP_ON | |
| 119 params.msg_lev = GLP_MSG_ERR | |
| 120 params.tm_lim=4000 | |
| 121 glp_init_smcp(params) | |
| 122 | |
| 123 glp_term_out(GLP_OFF) | |
| 124 | |
| 125 try: | |
| 126 | |
| 127 # Solve the problem | |
| 128 glp_scale_prob(lp,GLP_SF_AUTO) | |
| 129 | |
| 130 value=glp_simplex(lp, params) | |
| 131 | |
| 132 Z = glp_get_obj_val(lp); | |
| 133 | |
| 134 if value == 0: | |
| 135 fluxes = [] | |
| 136 for i in range(len(reactions)): fluxes.append(glp_get_col_prim(lp, i+1)) | |
| 137 return fluxes,Z | |
| 138 else: | |
| 139 raise Exception("error in LP problem. Problem:",str(value)) | |
| 140 except Exception as e: | |
| 141 # Re-enable terminal output for error reporting | |
| 142 glp_term_out(GLP_ON) | |
| 143 raise Exception(e) | |
| 144 finally: | |
| 145 # Re-enable terminal output after solving | |
| 146 glp_term_out(GLP_ON) | |
| 147 | |
| 148 def create_lp_structure(model): | |
| 149 """ | |
| 150 Extract the LP structure (S matrix, bounds, objective) from a COBRA model. | |
| 151 | |
| 152 Args: | |
| 153 model (cobra.Model): The COBRA model. | |
| 154 | |
| 155 Returns: | |
| 156 tuple: (S, lb, ub, coefs_obj, reactions) | |
| 157 """ | |
| 158 | |
| 159 reactions=[el.id for el in model.reactions] | |
| 160 coefs_obj=[reaction.objective_coefficient for reaction in model.reactions] | |
| 161 | |
| 162 # Lower and upper bounds | |
| 163 lb=[reaction.lower_bound for reaction in model.reactions] | |
| 164 ub=[reaction.upper_bound for reaction in model.reactions] | |
| 165 | |
| 166 # Create S matrix | |
| 167 S=cb.util.create_stoichiometric_matrix(model,array_type="dok") | |
| 168 | |
| 169 return S,lb,ub,coefs_obj,reactions | |
| 170 | |
| 171 def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample): | |
| 172 """ | |
| 173 Sample fluxes using GLPK by iterating over random objective functions. | |
| 174 | |
| 175 Args: | |
| 176 model: COBRA model. | |
| 177 nsample: Number of samples to generate. | |
| 178 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). | |
| 179 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). | |
| 180 | |
| 181 Returns: | |
| 182 None | |
| 183 """ | |
| 184 | |
| 185 S,lb,ub,coefs_obj,reactions = create_lp_structure(model) | |
| 186 len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S) | |
| 187 | |
| 188 for i in range(nsample): | |
| 189 | |
| 190 coefs_obj=coefficients_df.iloc[:,i].values | |
| 191 | |
| 192 if coefs_obj[-1]==1: # minimize | |
| 193 coefs_obj= coefs_obj[0:-1] * -1 | |
| 194 else: | |
| 195 coefs_obj=coefs_obj[0:-1] | |
| 196 | |
| 197 fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector, | |
| 198 ia, ja, ar, coefs_obj,reactions,return_lp=False) | |
| 199 df_sample.loc[i] = fluxes | |
| 200 return | |
| 201 | |
| 202 def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample): | |
| 203 """ | |
| 204 Fallback sampling using COBRApy's optimize with per-sample randomized objectives. | |
| 205 | |
| 206 Args: | |
| 207 model: COBRA model. | |
| 208 nsample: Number of samples to generate. | |
| 209 coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). | |
| 210 df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). | |
| 211 | |
| 212 Returns: | |
| 213 None | |
| 214 """ | |
| 215 | |
| 216 for i in range(nsample): | |
| 217 | |
| 218 dict_coeff={} | |
| 219 if(coefficients_df.iloc[-1][i]==1): | |
| 220 type_problem = -1 # minimize | |
| 221 else: | |
| 222 type_problem = 1 # maximize | |
| 223 | |
| 224 for rxn in [reaction.id for reaction in model.reactions]: | |
| 225 dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem | |
| 226 | |
| 227 model.objective = dict_coeff | |
| 228 solution = model.optimize().fluxes | |
| 229 for rxn, flux in solution.items(): | |
| 230 df_sample.loc[i][rxn] = flux | |
| 231 | |
| 232 return | |
| 233 | |
| 234 def randomObjectiveFunction(model, n_samples, df_fva, seed=0): | |
| 235 """ | |
| 236 Create random objective function coefficients for CBS sampling. | |
| 237 | |
| 238 The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize. | |
| 239 | |
| 240 Args: | |
| 241 model: COBRA model. | |
| 242 n_samples: Number of random objectives to generate. | |
| 243 df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction. | |
| 244 seed: Seed for reproducibility. | |
| 245 | |
| 246 Returns: | |
| 247 pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'. | |
| 248 """ | |
| 249 # reactions = model.reactions | |
| 250 reactions = [reaction.id for reaction in model.reactions] | |
| 251 cont = seed | |
| 252 list_ex = reactions.copy() | |
| 253 list_ex.append("type_of_problem") | |
| 254 coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)]) | |
| 255 | |
| 256 for i in range(0, n_samples): | |
| 257 | |
| 258 cont = cont + 1 | |
| 259 random.seed(cont) | |
| 260 | |
| 261 # Generate a random threshold in [0, 1] | |
| 262 threshold = random.random() | |
| 263 | |
| 264 for reaction in reactions: | |
| 265 | |
| 266 cont = cont + 1 | |
| 267 random.seed(cont) | |
| 268 | |
| 269 val = random.random() | |
| 270 | |
| 271 if val > threshold: | |
| 272 | |
| 273 cont = cont + 1 | |
| 274 random.seed(cont) | |
| 275 | |
| 276 # Coefficient in [-1, 1] | |
| 277 c = 2 * random.random() - 1 | |
| 278 | |
| 279 val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])]) | |
| 280 | |
| 281 if val_max != 0: # only if FVA is non-zero | |
| 282 coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA | |
| 283 else: | |
| 284 coefficients_df.loc[reaction, str(i)] = 0 | |
| 285 | |
| 286 else: | |
| 287 coefficients_df.loc[reaction, str(i)] = 0 | |
| 288 | |
| 289 cont = cont + 1 | |
| 290 random.seed(cont) | |
| 291 | |
| 292 if random.random() < 0.5: | |
| 293 coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize | |
| 294 else: | |
| 295 coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize | |
| 296 | |
| 297 return coefficients_df |
