comparison COBRAxy/custom_data_generator_beta.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents c3bb75ce07e6
children
comparison
equal deleted inserted replaced
455:4e2bc80764b6 456:a6e45049c1b9
1 """
2 Custom data generator for COBRA models.
3
4 This script loads a COBRA model (built-in or custom), optionally applies
5 medium and gene nomenclature settings, derives reaction-related metadata
6 (GPR rules, formulas, bounds, objective coefficients, medium membership,
7 and compartments for ENGRO2), and writes a tabular summary.
8 """
9
1 import os 10 import os
2 import csv 11 import csv
3 import cobra 12 import cobra
4 import pickle
5 import argparse 13 import argparse
6 import pandas as pd 14 import pandas as pd
7 import utils.general_utils as utils 15 import utils.general_utils as utils
8 import utils.rule_parsing as rulesUtils 16 from typing import Optional, Tuple, List
9 from typing import Optional, Tuple, Union, List, Dict
10 import utils.reaction_parsing as reactionUtils
11 import utils.model_utils as modelUtils 17 import utils.model_utils as modelUtils
12 import logging 18 import logging
13 19
14 ARGS : argparse.Namespace 20 ARGS : argparse.Namespace
15 def process_args(args: List[str] = None) -> argparse.Namespace: 21 def process_args(args: List[str] = None) -> argparse.Namespace:
48 return parser.parse_args(args) 54 return parser.parse_args(args)
49 55
50 ################################- INPUT DATA LOADING -################################ 56 ################################- INPUT DATA LOADING -################################
51 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: 57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
52 """ 58 """
53 Loads a custom model from a file, either in JSON or XML format. 59 Loads a custom model from a file, either in JSON, XML, MAT, or YML format.
54 60
55 Args: 61 Args:
56 file_path : The path to the file containing the custom model. 62 file_path : The path to the file containing the custom model.
57 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. 63 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.
58 64
68 return cobra.io.read_sbml_model(file_path.show()) 74 return cobra.io.read_sbml_model(file_path.show())
69 75
70 if ext is utils.FileFormat.JSON: 76 if ext is utils.FileFormat.JSON:
71 return cobra.io.load_json_model(file_path.show()) 77 return cobra.io.load_json_model(file_path.show())
72 78
79 if ext is utils.FileFormat.MAT:
80 return cobra.io.load_matlab_model(file_path.show())
81
82 if ext is utils.FileFormat.YML:
83 return cobra.io.load_yaml_model(file_path.show())
84
73 except Exception as e: raise utils.DataErr(file_path, e.__str__()) 85 except Exception as e: raise utils.DataErr(file_path, e.__str__())
74 raise utils.DataErr(file_path, 86 raise utils.DataErr(
75 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") 87 file_path,
88 f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported."
89 )
76 90
77 91
78 ###############################- FILE SAVING -################################ 92 ###############################- FILE SAVING -################################
79 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: 93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None:
80 """ 94 """
113 127
114 for key, value in data.items(): 128 for key, value in data.items():
115 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) 129 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
116 130
117 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: 131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
132 """
133 Save a pandas DataFrame as a tab-separated file, creating directories as needed.
134
135 Args:
136 df: The DataFrame to write.
137 path: Destination file path (will be written as TSV).
138
139 Raises:
140 DataErr: If writing the output fails for any reason.
141
142 Returns:
143 None
144 """
118 try: 145 try:
119 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) 146 os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
120 df.to_csv(path, sep="\t", index=False) 147 df.to_csv(path, sep="\t", index=False)
121 except Exception as e: 148 except Exception as e:
122 raise utils.DataErr(path, f"failed writing tabular output: {e}") 149 raise utils.DataErr(path, f"failed writing tabular output: {e}")
123 150
124 151
125 ###############################- ENTRY POINT -################################ 152 ###############################- ENTRY POINT -################################
126 def main(args:List[str] = None) -> None: 153 def main(args:List[str] = None) -> None:
127 """ 154 """
128 Initializes everything and sets the program in motion based on the fronted input arguments. 155 Initialize and generate custom data based on the frontend input arguments.
129 156
130 Returns: 157 Returns:
131 None 158 None
132 """ 159 """
133 # get args from frontend (related xml) 160 # Parse args from frontend (Galaxy XML)
134 global ARGS 161 global ARGS
135 ARGS = process_args(args) 162 ARGS = process_args(args)
136 163
137 164
138 if ARGS.input: 165 if ARGS.input:
139 # load custom model 166 # Load a custom model from file
140 model = load_custom_model( 167 model = load_custom_model(
141 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) 168 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
142 else: 169 else:
143 # load built-in model 170 # Load a built-in model
144 171
145 try: 172 try:
146 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] 173 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2']
147 except KeyError: 174 except KeyError:
148 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) 175 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model)
162 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) 189 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
163 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") 190 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
164 medium = df_mediums[[ARGS.medium_selector]] 191 medium = df_mediums[[ARGS.medium_selector]]
165 medium = medium[ARGS.medium_selector].to_dict() 192 medium = medium[ARGS.medium_selector].to_dict()
166 193
167 # Set all reactions to zero in the medium 194 # Reset all medium reactions lower bound to zero
168 for rxn_id, _ in model.medium.items(): 195 for rxn_id, _ in model.medium.items():
169 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) 196 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
170 197
171 # Set medium conditions 198 # Apply selected medium uptake bounds (negative for uptake)
172 for reaction, value in medium.items(): 199 for reaction, value in medium.items():
173 if value is not None: 200 if value is not None:
174 model.reactions.get_by_id(reaction).lower_bound = -float(value) 201 model.reactions.get_by_id(reaction).lower_bound = -float(value)
175 202
176 #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default":
177 # logging.basicConfig(level=logging.INFO)
178 # logger = logging.getLogger(__name__)
179
180 #model = modelUtils.translate_model_genes(
181 # model=model,
182 # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str},
183 # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "),
184 # source_nomenclature='HGNC_ID',
185 # logger=logger
186 #)
187 #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC "))
188
189 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": 203 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default":
190 logging.basicConfig(level=logging.INFO) 204 logging.basicConfig(level=logging.INFO)
191 logger = logging.getLogger(__name__) 205 logger = logging.getLogger(__name__)
192 206
193 model = modelUtils.translate_model_genes( 207 model = modelUtils.translate_model_genes(
211 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) 225 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"])
212 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) 226 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"])
213 227
214 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) 228 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
215 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) 229 df_medium = medium.rename(columns = {"reaction": "ReactionID"})
216 df_medium["InMedium"] = True # flag per indicare la presenza nel medium 230 df_medium["InMedium"] = True
217 231
218 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") 232 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
219 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") 233 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
220 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") 234 merged = merged.merge(objective_function, on = "ReactionID", how = "outer")
221 if ARGS.name == "ENGRO2": 235 if ARGS.name == "ENGRO2":
224 238
225 merged["InMedium"] = merged["InMedium"].fillna(False) 239 merged["InMedium"] = merged["InMedium"].fillna(False)
226 240
227 merged = merged.sort_values(by = "InMedium", ascending = False) 241 merged = merged.sort_values(by = "InMedium", ascending = False)
228 242
229 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data")
230
231 #merged.to_csv(out_file, sep = '\t', index = False)
232
233 ####
234
235 if not ARGS.out_tabular: 243 if not ARGS.out_tabular:
236 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) 244 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
237 save_as_tabular_df(merged, ARGS.out_tabular) 245 save_as_tabular_df(merged, ARGS.out_tabular)
238 expected = ARGS.out_tabular 246 expected = ARGS.out_tabular
239 247
240 # verify output exists and non-empty 248 # verify output exists and non-empty
241 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: 249 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
242 raise utils.DataErr(expected, "Output non creato o vuoto") 250 raise utils.DataErr(expected, "Output not created or empty")
243 251
244 print("CustomDataGenerator: completed successfully") 252 print("CustomDataGenerator: completed successfully")
245 253
246 if __name__ == '__main__': 254 if __name__ == '__main__':
247 main() 255 main()