Mercurial > repos > bimib > cobraxy
comparison COBRAxy/metabolic_model_setting.py @ 457:5b625d91bc7f draft
Uploaded
author | francesco_lapi |
---|---|
date | Wed, 17 Sep 2025 14:26:58 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
456:a6e45049c1b9 | 457:5b625d91bc7f |
---|---|
1 """ | |
2 Scripts to generate a tabular file of a metabolic model (built-in or custom). | |
3 | |
4 This script loads a COBRA model (built-in or custom), optionally applies | |
5 medium and gene nomenclature settings, derives reaction-related metadata | |
6 (GPR rules, formulas, bounds, objective coefficients, medium membership, | |
7 and compartments for ENGRO2), and writes a tabular summary. | |
8 """ | |
9 | |
10 import os | |
11 import csv | |
12 import cobra | |
13 import argparse | |
14 import pandas as pd | |
15 import utils.general_utils as utils | |
16 from typing import Optional, Tuple, List | |
17 import utils.model_utils as modelUtils | |
18 import logging | |
19 | |
20 ARGS : argparse.Namespace | |
21 def process_args(args: List[str] = None) -> argparse.Namespace: | |
22 """ | |
23 Parse command-line arguments for metabolic_model_setting. | |
24 """ | |
25 | |
26 parser = argparse.ArgumentParser( | |
27 usage="%(prog)s [options]", | |
28 description="Generate custom data from a given model" | |
29 ) | |
30 | |
31 parser.add_argument("--out_log", type=str, required=True, | |
32 help="Output log file") | |
33 | |
34 parser.add_argument("--model", type=str, | |
35 help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)") | |
36 parser.add_argument("--input", type=str, | |
37 help="Custom model file (JSON or XML)") | |
38 parser.add_argument("--name", type=str, required=True, | |
39 help="Model name (default or custom)") | |
40 | |
41 parser.add_argument("--medium_selector", type=str, required=True, | |
42 help="Medium selection option") | |
43 | |
44 parser.add_argument("--gene_format", type=str, default="Default", | |
45 help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ") | |
46 | |
47 parser.add_argument("--out_tabular", type=str, | |
48 help="Output file for the merged dataset (CSV or XLSX)") | |
49 | |
50 parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__), | |
51 help="Tool directory (passed from Galaxy as $__tool_directory__)") | |
52 | |
53 | |
54 return parser.parse_args(args) | |
55 | |
56 ################################- INPUT DATA LOADING -################################ | |
57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: | |
58 """ | |
59 Loads a custom model from a file, either in JSON, XML, MAT, or YML format. | |
60 | |
61 Args: | |
62 file_path : The path to the file containing the custom model. | |
63 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. | |
64 | |
65 Raises: | |
66 DataErr : if the file is in an invalid format or cannot be opened for whatever reason. | |
67 | |
68 Returns: | |
69 cobra.Model : the model, if successfully opened. | |
70 """ | |
71 ext = ext if ext else file_path.ext | |
72 try: | |
73 if ext is utils.FileFormat.XML: | |
74 return cobra.io.read_sbml_model(file_path.show()) | |
75 | |
76 if ext is utils.FileFormat.JSON: | |
77 return cobra.io.load_json_model(file_path.show()) | |
78 | |
79 if ext is utils.FileFormat.MAT: | |
80 return cobra.io.load_matlab_model(file_path.show()) | |
81 | |
82 if ext is utils.FileFormat.YML: | |
83 return cobra.io.load_yaml_model(file_path.show()) | |
84 | |
85 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | |
86 raise utils.DataErr( | |
87 file_path, | |
88 f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported." | |
89 ) | |
90 | |
91 | |
92 ###############################- FILE SAVING -################################ | |
93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | |
94 """ | |
95 Saves any dictionary-shaped data in a .csv file created at the given file_path as FilePath. | |
96 | |
97 Args: | |
98 data : the data to be written to the file. | |
99 file_path : the path to the .csv file. | |
100 fieldNames : the names of the fields (columns) in the .csv file. | |
101 | |
102 Returns: | |
103 None | |
104 """ | |
105 with open(file_path.show(), 'w', newline='') as csvfile: | |
106 writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab") | |
107 writer.writeheader() | |
108 | |
109 for key, value in data.items(): | |
110 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | |
111 | |
112 def save_as_csv(data :dict, file_path :str, fieldNames :Tuple[str, str]) -> None: | |
113 """ | |
114 Saves any dictionary-shaped data in a .csv file created at the given file_path as string. | |
115 | |
116 Args: | |
117 data : the data to be written to the file. | |
118 file_path : the path to the .csv file. | |
119 fieldNames : the names of the fields (columns) in the .csv file. | |
120 | |
121 Returns: | |
122 None | |
123 """ | |
124 with open(file_path, 'w', newline='') as csvfile: | |
125 writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab") | |
126 writer.writeheader() | |
127 | |
128 for key, value in data.items(): | |
129 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | |
130 | |
131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: | |
132 """ | |
133 Save a pandas DataFrame as a tab-separated file, creating directories as needed. | |
134 | |
135 Args: | |
136 df: The DataFrame to write. | |
137 path: Destination file path (will be written as TSV). | |
138 | |
139 Raises: | |
140 DataErr: If writing the output fails for any reason. | |
141 | |
142 Returns: | |
143 None | |
144 """ | |
145 try: | |
146 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | |
147 df.to_csv(path, sep="\t", index=False) | |
148 except Exception as e: | |
149 raise utils.DataErr(path, f"failed writing tabular output: {e}") | |
150 | |
151 | |
152 ###############################- ENTRY POINT -################################ | |
153 def main(args:List[str] = None) -> None: | |
154 """ | |
155 Initialize and generate custom data based on the frontend input arguments. | |
156 | |
157 Returns: | |
158 None | |
159 """ | |
160 # Parse args from frontend (Galaxy XML) | |
161 global ARGS | |
162 ARGS = process_args(args) | |
163 | |
164 | |
165 if ARGS.input: | |
166 # Load a custom model from file | |
167 model = load_custom_model( | |
168 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) | |
169 else: | |
170 # Load a built-in model | |
171 | |
172 try: | |
173 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] | |
174 except KeyError: | |
175 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) | |
176 | |
177 # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models) | |
178 try: | |
179 model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir) | |
180 except Exception as e: | |
181 # Wrap/normalize load errors as DataErr for consistency | |
182 raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}") | |
183 | |
184 # Determine final model name: explicit --name overrides, otherwise use the model id | |
185 | |
186 model_name = ARGS.name if ARGS.name else ARGS.model | |
187 | |
188 if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default": | |
189 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) | |
190 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") | |
191 medium = df_mediums[[ARGS.medium_selector]] | |
192 medium = medium[ARGS.medium_selector].to_dict() | |
193 | |
194 # Reset all medium reactions lower bound to zero | |
195 for rxn_id, _ in model.medium.items(): | |
196 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) | |
197 | |
198 # Apply selected medium uptake bounds (negative for uptake) | |
199 for reaction, value in medium.items(): | |
200 if value is not None: | |
201 model.reactions.get_by_id(reaction).lower_bound = -float(value) | |
202 | |
203 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": | |
204 logging.basicConfig(level=logging.INFO) | |
205 logger = logging.getLogger(__name__) | |
206 | |
207 model = modelUtils.translate_model_genes( | |
208 model=model, | |
209 mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv", dtype={'entrez_id': str}), | |
210 target_nomenclature=ARGS.gene_format, | |
211 source_nomenclature='HGNC_symbol', | |
212 logger=logger | |
213 ) | |
214 | |
215 # generate data | |
216 rules = modelUtils.generate_rules(model, asParsed = False) | |
217 reactions = modelUtils.generate_reactions(model, asParsed = False) | |
218 bounds = modelUtils.generate_bounds(model) | |
219 medium = modelUtils.get_medium(model) | |
220 objective_function = modelUtils.extract_objective_coefficients(model) | |
221 | |
222 if ARGS.name == "ENGRO2": | |
223 compartments = modelUtils.generate_compartments(model) | |
224 | |
225 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) | |
226 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) | |
227 | |
228 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | |
229 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | |
230 df_medium["InMedium"] = True | |
231 | |
232 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | |
233 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | |
234 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") | |
235 if ARGS.name == "ENGRO2": | |
236 merged = merged.merge(compartments, on = "ReactionID", how = "outer") | |
237 merged = merged.merge(df_medium, on = "ReactionID", how = "left") | |
238 | |
239 merged["InMedium"] = merged["InMedium"].fillna(False) | |
240 | |
241 merged = merged.sort_values(by = "InMedium", ascending = False) | |
242 | |
243 if not ARGS.out_tabular: | |
244 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | |
245 save_as_tabular_df(merged, ARGS.out_tabular) | |
246 expected = ARGS.out_tabular | |
247 | |
248 # verify output exists and non-empty | |
249 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: | |
250 raise utils.DataErr(expected, "Output not created or empty") | |
251 | |
252 print("Metabolic_model_setting: completed successfully") | |
253 | |
254 if __name__ == '__main__': | |
255 | |
256 main() |