comparison COBRAxy/custom_data_generator.py @ 406:187cee1a00e2 draft

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 14:44:15 +0000
parents 08f1ff359397
children
comparison
equal deleted inserted replaced
405:716b1a638fb5 406:187cee1a00e2
8 import utils.rule_parsing as rulesUtils 8 import utils.rule_parsing as rulesUtils
9 from typing import Optional, Tuple, Union, List, Dict 9 from typing import Optional, Tuple, Union, List, Dict
10 import utils.reaction_parsing as reactionUtils 10 import utils.reaction_parsing as reactionUtils
11 11
12 ARGS : argparse.Namespace 12 ARGS : argparse.Namespace
13 def process_args(args: List[str] = None) -> argparse.Namespace: 13 def process_args(args:List[str] = None) -> argparse.Namespace:
14 """ 14 """
15 Parse command-line arguments for CustomDataGenerator. 15 Interfaces the script of a module with its frontend, making the user's choices for
16 """ 16 various parameters available as values in code.
17 17
18 Args:
19 args : Always obtained (in file) from sys.argv
20
21 Returns:
22 Namespace : An object containing the parsed arguments
23 """
18 parser = argparse.ArgumentParser( 24 parser = argparse.ArgumentParser(
19 usage="%(prog)s [options]", 25 usage = "%(prog)s [options]",
20 description="Generate custom data from a given model" 26 description = "generate custom data from a given model")
21 ) 27
22 28 parser.add_argument("-ol", "--out_log", type = str, required = True, help = "Output log")
23 parser.add_argument("--out_log", type=str, required=True, 29
24 help="Output log file") 30 parser.add_argument("-orules", "--out_rules", type = str, required = True, help = "Output rules")
25 31 parser.add_argument("-orxns", "--out_reactions", type = str, required = True, help = "Output reactions")
26 parser.add_argument("--model", type=str, 32 parser.add_argument("-omedium", "--out_medium", type = str, required = True, help = "Output medium")
27 help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)") 33 parser.add_argument("-obnds", "--out_bounds", type = str, required = True, help = "Output bounds")
28 parser.add_argument("--input", type=str, 34
29 help="Custom model file (JSON or XML)") 35 parser.add_argument("-id", "--input", type = str, required = True, help = "Input model")
30 parser.add_argument("--name", type=str, required=True, 36 parser.add_argument("-mn", "--name", type = str, required = True, help = "Input model name")
31 help="Model name (default or custom)") 37 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
32 38 parser.add_argument('-idop', '--output_path', type = str, default='result', help = 'output path for maps')
33 parser.add_argument("--medium_selector", type=str, required=True, 39 argsNamespace = parser.parse_args(args)
34 help="Medium selection option") 40 # ^ can't get this one to work from xml, there doesn't seem to be a way to get the directory attribute from the collection
35 41
36 parser.add_argument("--gene_format", type=str, default="Default", 42 return argsNamespace
37 help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ")
38
39 parser.add_argument("--out_tabular", type=str,
40 help="Output file for the merged dataset (CSV or XLSX)")
41
42 parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__),
43 help="Tool directory (passed from Galaxy as $__tool_directory__)")
44
45
46 return parser.parse_args(args)
47 43
48 ################################- INPUT DATA LOADING -################################ 44 ################################- INPUT DATA LOADING -################################
49 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: 45 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
50 """ 46 """
51 Loads a custom model from a file, either in JSON or XML format. 47 Loads a custom model from a file, either in JSON or XML format.
145 for reaction in model.reactions: 141 for reaction in model.reactions:
146 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] 142 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
147 return bounds 143 return bounds
148 144
149 145
150
151 def generate_compartments(model: cobra.Model) -> pd.DataFrame:
152 """
153 Generates a DataFrame containing compartment information for each reaction.
154 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
155
156 Args:
157 model: the COBRA model to extract compartment data from.
158
159 Returns:
160 pd.DataFrame: DataFrame with ReactionID and compartment columns
161 """
162 pathway_data = []
163
164 # First pass: determine the maximum number of pathways any reaction has
165 max_pathways = 0
166 reaction_pathways = {}
167
168 for reaction in model.reactions:
169 # Get unique pathways from all metabolites in the reaction
170 if type(reaction.annotation['pathways']) == list:
171 reaction_pathways[reaction.id] = reaction.annotation['pathways']
172 max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
173 else:
174 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
175
176 # Create column names for pathways
177 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
178
179 # Second pass: create the data
180 for reaction_id, pathways in reaction_pathways.items():
181 row = {"ReactionID": reaction_id}
182
183 # Fill pathway columns
184 for i in range(max_pathways):
185 col_name = pathway_columns[i]
186 if i < len(pathways):
187 row[col_name] = pathways[i]
188 else:
189 row[col_name] = None # or "" if you prefer empty strings
190
191 pathway_data.append(row)
192
193 return pd.DataFrame(pathway_data)
194
195
196 ###############################- FILE SAVING -################################ 146 ###############################- FILE SAVING -################################
197 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: 147 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None:
198 """ 148 """
199 Saves any dictionary-shaped data in a .csv file created at the given file_path as FilePath. 149 Saves any dictionary-shaped data in a .csv file created at the given file_path as FilePath.
200 150
230 writer.writeheader() 180 writer.writeheader()
231 181
232 for key, value in data.items(): 182 for key, value in data.items():
233 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) 183 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
234 184
235 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
236 try:
237 os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
238 df.to_csv(path, sep="\t", index=False)
239 except Exception as e:
240 raise utils.DataErr(path, f"failed writing tabular output: {e}")
241
242
243 ###############################- ENTRY POINT -################################ 185 ###############################- ENTRY POINT -################################
244 def main(args:List[str] = None) -> None: 186 def main(args:List[str] = None) -> None:
245 """ 187 """
246 Initializes everything and sets the program in motion based on the fronted input arguments. 188 Initializes everything and sets the program in motion based on the fronted input arguments.
247 189
250 """ 192 """
251 # get args from frontend (related xml) 193 # get args from frontend (related xml)
252 global ARGS 194 global ARGS
253 ARGS = process_args(args) 195 ARGS = process_args(args)
254 196
255 197 # this is the worst thing I've seen so far, congrats to the former MaREA devs for suggesting this!
256 if ARGS.input: 198 if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
257 # load custom model 199
258 model = load_custom_model( 200 # load custom model
259 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) 201 model = load_custom_model(
260 else: 202 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
261 # load built-in model
262
263 try:
264 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2']
265 except KeyError:
266 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model)
267
268 # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models)
269 try:
270 model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir)
271 except Exception as e:
272 # Wrap/normalize load errors as DataErr for consistency
273 raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}")
274
275 # Determine final model name: explicit --name overrides, otherwise use the model id
276
277 model_name = ARGS.name if ARGS.name else ARGS.model
278
279 if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default":
280 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
281 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
282 medium = df_mediums[[ARGS.medium_selector]]
283 medium = medium[ARGS.medium_selector].to_dict()
284
285 # Set all reactions to zero in the medium
286 for rxn_id, _ in model.medium.items():
287 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
288
289 # Set medium conditions
290 for reaction, value in medium.items():
291 if value is not None:
292 model.reactions.get_by_id(reaction).lower_bound = -float(value)
293
294 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default":
295
296 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC "))
297 203
298 # generate data 204 # generate data
299 rules = generate_rules(model, asParsed = False) 205 rules = generate_rules(model, asParsed = False)
300 reactions = generate_reactions(model, asParsed = False) 206 reactions = generate_reactions(model, asParsed = False)
301 bounds = generate_bounds(model) 207 bounds = generate_bounds(model)
302 medium = get_medium(model) 208 medium = get_medium(model)
303 if ARGS.name == "ENGRO2": 209
304 compartments = generate_compartments(model) 210 # save files out of collection: path coming from xml
305 211 save_as_csv(rules, ARGS.out_rules, ("ReactionID", "Rule"))
306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) 212 save_as_csv(reactions, ARGS.out_reactions, ("ReactionID", "Reaction"))
307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) 213 bounds.to_csv(ARGS.out_bounds, sep = '\t')
308 214 medium.to_csv(ARGS.out_medium, sep = '\t')
309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
310 df_medium = medium.rename(columns = {"reaction": "ReactionID"})
311 df_medium["InMedium"] = True # flag per indicare la presenza nel medium
312
313 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
314 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
315 if ARGS.name == "ENGRO2":
316 merged = merged.merge(compartments, on = "ReactionID", how = "outer")
317 merged = merged.merge(df_medium, on = "ReactionID", how = "left")
318
319 merged["InMedium"] = merged["InMedium"].fillna(False)
320
321 merged = merged.sort_values(by = "InMedium", ascending = False)
322
323 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data")
324
325 #merged.to_csv(out_file, sep = '\t', index = False)
326
327
328 ####
329
330
331 if not ARGS.out_tabular:
332 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
333 save_as_tabular_df(merged, ARGS.out_tabular)
334 expected = ARGS.out_tabular
335
336 # verify output exists and non-empty
337 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
338 raise utils.DataErr(expected, "Output non creato o vuoto")
339
340 print("CustomDataGenerator: completed successfully")
341 215
342 if __name__ == '__main__': 216 if __name__ == '__main__':
343 main() 217 main()