diff COBRAxy/custom_data_generator_beta.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents c3bb75ce07e6
children
line wrap: on
line diff
--- a/COBRAxy/custom_data_generator_beta.py	Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/custom_data_generator_beta.py	Fri Sep 12 17:28:45 2025 +0000
@@ -1,13 +1,19 @@
+"""
+Custom data generator for COBRA models.
+
+This script loads a COBRA model (built-in or custom), optionally applies
+medium and gene nomenclature settings, derives reaction-related metadata
+(GPR rules, formulas, bounds, objective coefficients, medium membership,
+and compartments for ENGRO2), and writes a tabular summary.
+"""
+
 import os
 import csv
 import cobra
-import pickle
 import argparse
 import pandas as pd
 import utils.general_utils as utils
-import utils.rule_parsing  as rulesUtils
-from typing import Optional, Tuple, Union, List, Dict
-import utils.reaction_parsing as reactionUtils
+from typing import Optional, Tuple, List
 import utils.model_utils as modelUtils
 import logging
 
@@ -50,7 +56,7 @@
 ################################- INPUT DATA LOADING -################################
 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
     """
-    Loads a custom model from a file, either in JSON or XML format.
+    Loads a custom model from a file, either in JSON, XML, MAT, or YML format.
 
     Args:
         file_path : The path to the file containing the custom model.
@@ -70,9 +76,17 @@
         if ext is utils.FileFormat.JSON:
             return cobra.io.load_json_model(file_path.show())
 
+        if ext is utils.FileFormat.MAT:
+            return cobra.io.load_matlab_model(file_path.show())
+
+        if ext is utils.FileFormat.YML:
+            return cobra.io.load_yaml_model(file_path.show())
+
     except Exception as e: raise utils.DataErr(file_path, e.__str__())
-    raise utils.DataErr(file_path,
-        f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML")
+    raise utils.DataErr(
+        file_path,
+        f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported."
+    )
 
 
 ###############################- FILE SAVING -################################
@@ -115,6 +129,19 @@
             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 
 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
+    """
+    Save a pandas DataFrame as a tab-separated file, creating directories as needed.
+
+    Args:
+        df: The DataFrame to write.
+        path: Destination file path (will be written as TSV).
+
+    Raises:
+        DataErr: If writing the output fails for any reason.
+
+    Returns:
+        None
+    """
     try:
         os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
         df.to_csv(path, sep="\t", index=False)
@@ -125,22 +152,22 @@
 ###############################- ENTRY POINT -################################
 def main(args:List[str] = None) -> None:
     """
-    Initializes everything and sets the program in motion based on the fronted input arguments.
+    Initialize and generate custom data based on the frontend input arguments.
     
     Returns:
         None
     """
-    # get args from frontend (related xml)
+    # Parse args from frontend (Galaxy XML)
     global ARGS
     ARGS = process_args(args)
 
 
     if ARGS.input:
-        # load custom model
+        # Load a custom model from file
         model = load_custom_model(
             utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
     else:
-        # load built-in model
+        # Load a built-in model
 
         try:
             model_enum = utils.Model[ARGS.model]  # e.g., Model['ENGRO2']
@@ -164,28 +191,15 @@
         medium = df_mediums[[ARGS.medium_selector]]
         medium = medium[ARGS.medium_selector].to_dict()
 
-        # Set all reactions to zero in the medium
+        # Reset all medium reactions lower bound to zero
         for rxn_id, _ in model.medium.items():
             model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
         
-        # Set medium conditions
+        # Apply selected medium uptake bounds (negative for uptake)
         for reaction, value in medium.items():
             if value is not None:
                 model.reactions.get_by_id(reaction).lower_bound = -float(value)
 
-    #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default":
-    #    logging.basicConfig(level=logging.INFO)
-    #    logger = logging.getLogger(__name__)
-
-        #model = modelUtils.translate_model_genes(
-        #    model=model,
-        #    mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str},
-        #    target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "),
-        #    source_nomenclature='HGNC_ID',
-        #    logger=logger
-        #)
-        #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC "))
-    
     if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default":
         logging.basicConfig(level=logging.INFO)
         logger = logging.getLogger(__name__)
@@ -213,7 +227,7 @@
 
     df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
     df_medium = medium.rename(columns = {"reaction": "ReactionID"})
-    df_medium["InMedium"] = True # flag per indicare la presenza nel medium
+    df_medium["InMedium"] = True
 
     merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
     merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
@@ -226,12 +240,6 @@
 
     merged = merged.sort_values(by = "InMedium", ascending = False)
 
-    #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data")
-
-    #merged.to_csv(out_file, sep = '\t', index = False)
-
-    ####
-
     if not ARGS.out_tabular:
         raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
     save_as_tabular_df(merged, ARGS.out_tabular)
@@ -239,7 +247,7 @@
 
     # verify output exists and non-empty
     if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
-        raise utils.DataErr(expected, "Output non creato o vuoto")
+        raise utils.DataErr(expected, "Output not created or empty")
 
     print("CustomDataGenerator: completed successfully")