Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | c3bb75ce07e6 |
children |
comparison
equal
deleted
inserted
replaced
455:4e2bc80764b6 | 456:a6e45049c1b9 |
---|---|
1 """ | |
2 Custom data generator for COBRA models. | |
3 | |
4 This script loads a COBRA model (built-in or custom), optionally applies | |
5 medium and gene nomenclature settings, derives reaction-related metadata | |
6 (GPR rules, formulas, bounds, objective coefficients, medium membership, | |
7 and compartments for ENGRO2), and writes a tabular summary. | |
8 """ | |
9 | |
1 import os | 10 import os |
2 import csv | 11 import csv |
3 import cobra | 12 import cobra |
4 import pickle | |
5 import argparse | 13 import argparse |
6 import pandas as pd | 14 import pandas as pd |
7 import utils.general_utils as utils | 15 import utils.general_utils as utils |
8 import utils.rule_parsing as rulesUtils | 16 from typing import Optional, Tuple, List |
9 from typing import Optional, Tuple, Union, List, Dict | |
10 import utils.reaction_parsing as reactionUtils | |
11 import utils.model_utils as modelUtils | 17 import utils.model_utils as modelUtils |
12 import logging | 18 import logging |
13 | 19 |
14 ARGS : argparse.Namespace | 20 ARGS : argparse.Namespace |
15 def process_args(args: List[str] = None) -> argparse.Namespace: | 21 def process_args(args: List[str] = None) -> argparse.Namespace: |
48 return parser.parse_args(args) | 54 return parser.parse_args(args) |
49 | 55 |
50 ################################- INPUT DATA LOADING -################################ | 56 ################################- INPUT DATA LOADING -################################ |
51 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: | 57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: |
52 """ | 58 """ |
53 Loads a custom model from a file, either in JSON or XML format. | 59 Loads a custom model from a file, either in JSON, XML, MAT, or YML format. |
54 | 60 |
55 Args: | 61 Args: |
56 file_path : The path to the file containing the custom model. | 62 file_path : The path to the file containing the custom model. |
57 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. | 63 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. |
58 | 64 |
68 return cobra.io.read_sbml_model(file_path.show()) | 74 return cobra.io.read_sbml_model(file_path.show()) |
69 | 75 |
70 if ext is utils.FileFormat.JSON: | 76 if ext is utils.FileFormat.JSON: |
71 return cobra.io.load_json_model(file_path.show()) | 77 return cobra.io.load_json_model(file_path.show()) |
72 | 78 |
79 if ext is utils.FileFormat.MAT: | |
80 return cobra.io.load_matlab_model(file_path.show()) | |
81 | |
82 if ext is utils.FileFormat.YML: | |
83 return cobra.io.load_yaml_model(file_path.show()) | |
84 | |
73 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 85 except Exception as e: raise utils.DataErr(file_path, e.__str__()) |
74 raise utils.DataErr(file_path, | 86 raise utils.DataErr( |
75 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 87 file_path, |
88 f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported." | |
89 ) | |
76 | 90 |
77 | 91 |
78 ###############################- FILE SAVING -################################ | 92 ###############################- FILE SAVING -################################ |
79 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
80 """ | 94 """ |
113 | 127 |
114 for key, value in data.items(): | 128 for key, value in data.items(): |
115 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | 129 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) |
116 | 130 |
117 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: | 131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: |
132 """ | |
133 Save a pandas DataFrame as a tab-separated file, creating directories as needed. | |
134 | |
135 Args: | |
136 df: The DataFrame to write. | |
137 path: Destination file path (will be written as TSV). | |
138 | |
139 Raises: | |
140 DataErr: If writing the output fails for any reason. | |
141 | |
142 Returns: | |
143 None | |
144 """ | |
118 try: | 145 try: |
119 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | 146 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
120 df.to_csv(path, sep="\t", index=False) | 147 df.to_csv(path, sep="\t", index=False) |
121 except Exception as e: | 148 except Exception as e: |
122 raise utils.DataErr(path, f"failed writing tabular output: {e}") | 149 raise utils.DataErr(path, f"failed writing tabular output: {e}") |
123 | 150 |
124 | 151 |
125 ###############################- ENTRY POINT -################################ | 152 ###############################- ENTRY POINT -################################ |
126 def main(args:List[str] = None) -> None: | 153 def main(args:List[str] = None) -> None: |
127 """ | 154 """ |
128 Initializes everything and sets the program in motion based on the fronted input arguments. | 155 Initialize and generate custom data based on the frontend input arguments. |
129 | 156 |
130 Returns: | 157 Returns: |
131 None | 158 None |
132 """ | 159 """ |
133 # get args from frontend (related xml) | 160 # Parse args from frontend (Galaxy XML) |
134 global ARGS | 161 global ARGS |
135 ARGS = process_args(args) | 162 ARGS = process_args(args) |
136 | 163 |
137 | 164 |
138 if ARGS.input: | 165 if ARGS.input: |
139 # load custom model | 166 # Load a custom model from file |
140 model = load_custom_model( | 167 model = load_custom_model( |
141 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) | 168 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) |
142 else: | 169 else: |
143 # load built-in model | 170 # Load a built-in model |
144 | 171 |
145 try: | 172 try: |
146 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] | 173 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] |
147 except KeyError: | 174 except KeyError: |
148 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) | 175 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) |
162 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) | 189 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) |
163 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") | 190 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") |
164 medium = df_mediums[[ARGS.medium_selector]] | 191 medium = df_mediums[[ARGS.medium_selector]] |
165 medium = medium[ARGS.medium_selector].to_dict() | 192 medium = medium[ARGS.medium_selector].to_dict() |
166 | 193 |
167 # Set all reactions to zero in the medium | 194 # Reset all medium reactions lower bound to zero |
168 for rxn_id, _ in model.medium.items(): | 195 for rxn_id, _ in model.medium.items(): |
169 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) | 196 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) |
170 | 197 |
171 # Set medium conditions | 198 # Apply selected medium uptake bounds (negative for uptake) |
172 for reaction, value in medium.items(): | 199 for reaction, value in medium.items(): |
173 if value is not None: | 200 if value is not None: |
174 model.reactions.get_by_id(reaction).lower_bound = -float(value) | 201 model.reactions.get_by_id(reaction).lower_bound = -float(value) |
175 | 202 |
176 #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | |
177 # logging.basicConfig(level=logging.INFO) | |
178 # logger = logging.getLogger(__name__) | |
179 | |
180 #model = modelUtils.translate_model_genes( | |
181 # model=model, | |
182 # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str}, | |
183 # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "), | |
184 # source_nomenclature='HGNC_ID', | |
185 # logger=logger | |
186 #) | |
187 #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | |
188 | |
189 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": | 203 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": |
190 logging.basicConfig(level=logging.INFO) | 204 logging.basicConfig(level=logging.INFO) |
191 logger = logging.getLogger(__name__) | 205 logger = logging.getLogger(__name__) |
192 | 206 |
193 model = modelUtils.translate_model_genes( | 207 model = modelUtils.translate_model_genes( |
211 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) | 225 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) |
212 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) | 226 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) |
213 | 227 |
214 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 228 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |
215 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | 229 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) |
216 df_medium["InMedium"] = True # flag per indicare la presenza nel medium | 230 df_medium["InMedium"] = True |
217 | 231 |
218 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | 232 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") |
219 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | 233 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") |
220 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") | 234 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") |
221 if ARGS.name == "ENGRO2": | 235 if ARGS.name == "ENGRO2": |
224 | 238 |
225 merged["InMedium"] = merged["InMedium"].fillna(False) | 239 merged["InMedium"] = merged["InMedium"].fillna(False) |
226 | 240 |
227 merged = merged.sort_values(by = "InMedium", ascending = False) | 241 merged = merged.sort_values(by = "InMedium", ascending = False) |
228 | 242 |
229 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") | |
230 | |
231 #merged.to_csv(out_file, sep = '\t', index = False) | |
232 | |
233 #### | |
234 | |
235 if not ARGS.out_tabular: | 243 if not ARGS.out_tabular: |
236 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | 244 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) |
237 save_as_tabular_df(merged, ARGS.out_tabular) | 245 save_as_tabular_df(merged, ARGS.out_tabular) |
238 expected = ARGS.out_tabular | 246 expected = ARGS.out_tabular |
239 | 247 |
240 # verify output exists and non-empty | 248 # verify output exists and non-empty |
241 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: | 249 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: |
242 raise utils.DataErr(expected, "Output non creato o vuoto") | 250 raise utils.DataErr(expected, "Output not created or empty") |
243 | 251 |
244 print("CustomDataGenerator: completed successfully") | 252 print("CustomDataGenerator: completed successfully") |
245 | 253 |
246 if __name__ == '__main__': | 254 if __name__ == '__main__': |
247 main() | 255 main() |