diff COBRAxy/custom_data_generator.py @ 406:187cee1a00e2 draft

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 14:44:15 +0000
parents 08f1ff359397
children
line wrap: on
line diff
--- a/COBRAxy/custom_data_generator.py	Mon Sep 08 13:52:58 2025 +0000
+++ b/COBRAxy/custom_data_generator.py	Mon Sep 08 14:44:15 2025 +0000
@@ -10,40 +10,36 @@
 import utils.reaction_parsing as reactionUtils
 
 ARGS : argparse.Namespace
-def process_args(args: List[str] = None) -> argparse.Namespace:
+def process_args(args:List[str] = None) -> argparse.Namespace:
     """
-    Parse command-line arguments for CustomDataGenerator.
-    """
+    Interfaces the script of a module with its frontend, making the user's choices for
+    various parameters available as values in code.
 
-    parser = argparse.ArgumentParser(
-        usage="%(prog)s [options]",
-        description="Generate custom data from a given model"
-    )
-
-    parser.add_argument("--out_log", type=str, required=True,
-                        help="Output log file")
+    Args:
+        args : Always obtained (in file) from sys.argv
 
-    parser.add_argument("--model", type=str,
-                        help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)")
-    parser.add_argument("--input", type=str,
-                        help="Custom model file (JSON or XML)")
-    parser.add_argument("--name", type=str, required=True,
-                        help="Model name (default or custom)")
+    Returns:
+        Namespace : An object containing the parsed arguments
+    """
+    parser = argparse.ArgumentParser(
+        usage = "%(prog)s [options]",
+        description = "generate custom data from a given model")
     
-    parser.add_argument("--medium_selector", type=str, required=True,
-                        help="Medium selection option")
+    parser.add_argument("-ol", "--out_log", type = str, required = True, help = "Output log")
 
-    parser.add_argument("--gene_format", type=str, default="Default",
-                        help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ")
-    
-    parser.add_argument("--out_tabular", type=str,
-                        help="Output file for the merged dataset (CSV or XLSX)")
-    
-    parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__),
-                        help="Tool directory (passed from Galaxy as $__tool_directory__)")
+    parser.add_argument("-orules", "--out_rules", type = str, required = True, help = "Output rules")
+    parser.add_argument("-orxns", "--out_reactions", type = str, required = True, help = "Output reactions")
+    parser.add_argument("-omedium", "--out_medium", type = str, required = True, help = "Output medium")
+    parser.add_argument("-obnds", "--out_bounds", type = str, required = True, help = "Output bounds")
 
+    parser.add_argument("-id", "--input",   type = str, required = True, help = "Input model")
+    parser.add_argument("-mn", "--name",    type = str, required = True, help = "Input model name")
+    # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
+    parser.add_argument('-idop', '--output_path', type = str, default='result', help = 'output path for maps')
+    argsNamespace = parser.parse_args(args)
+    # ^ can't get this one to work from xml, there doesn't seem to be a way to get the directory attribute from the collection
 
-    return parser.parse_args(args)
+    return argsNamespace
 
 ################################- INPUT DATA LOADING -################################
 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
@@ -147,52 +143,6 @@
     return bounds
 
 
-
-def generate_compartments(model: cobra.Model) -> pd.DataFrame:
-    """
-    Generates a DataFrame containing compartment information for each reaction.
-    Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
-    
-    Args:
-        model: the COBRA model to extract compartment data from.
-        
-    Returns:
-        pd.DataFrame: DataFrame with ReactionID and compartment columns
-    """
-    pathway_data = []
-
-    # First pass: determine the maximum number of pathways any reaction has
-    max_pathways = 0
-    reaction_pathways = {}
-
-    for reaction in model.reactions:
-        # Get unique pathways from all metabolites in the reaction
-        if type(reaction.annotation['pathways']) == list:
-            reaction_pathways[reaction.id] = reaction.annotation['pathways']
-            max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
-        else:
-            reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
-
-    # Create column names for pathways
-    pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
-
-    # Second pass: create the data
-    for reaction_id, pathways in reaction_pathways.items():
-        row = {"ReactionID": reaction_id}
-        
-        # Fill pathway columns
-        for i in range(max_pathways):
-            col_name = pathway_columns[i]
-            if i < len(pathways):
-                row[col_name] = pathways[i]
-            else:
-                row[col_name] = None  # or "" if you prefer empty strings
-
-        pathway_data.append(row)
-
-    return pd.DataFrame(pathway_data)
-
-
 ###############################- FILE SAVING -################################
 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None:
     """
@@ -232,14 +182,6 @@
         for key, value in data.items():
             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 
-def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
-    try:
-        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
-        df.to_csv(path, sep="\t", index=False)
-    except Exception as e:
-        raise utils.DataErr(path, f"failed writing tabular output: {e}")
-
-
 ###############################- ENTRY POINT -################################
 def main(args:List[str] = None) -> None:
     """
@@ -252,92 +194,24 @@
     global ARGS
     ARGS = process_args(args)
 
-
-    if ARGS.input:
-        # load custom model
-        model = load_custom_model(
-            utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
-    else:
-        # load built-in model
-
-        try:
-            model_enum = utils.Model[ARGS.model]  # e.g., Model['ENGRO2']
-        except KeyError:
-            raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model)
-
-        # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models)
-        try:
-            model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir)
-        except Exception as e:
-            # Wrap/normalize load errors as DataErr for consistency
-            raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}")
+    # this is the worst thing I've seen so far, congrats to the former MaREA devs for suggesting this!
+    if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
 
-    # Determine final model name: explicit --name overrides, otherwise use the model id
-    
-    model_name = ARGS.name if ARGS.name else ARGS.model
-    
-    if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default":
-        df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
-        ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
-        medium = df_mediums[[ARGS.medium_selector]]
-        medium = medium[ARGS.medium_selector].to_dict()
-
-        # Set all reactions to zero in the medium
-        for rxn_id, _ in model.medium.items():
-            model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
-        
-        # Set medium conditions
-        for reaction, value in medium.items():
-            if value is not None:
-                model.reactions.get_by_id(reaction).lower_bound = -float(value)
-
-    if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default":
-
-        model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC "))
+    # load custom model
+    model = load_custom_model(
+        utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
 
     # generate data
     rules = generate_rules(model, asParsed = False)
     reactions = generate_reactions(model, asParsed = False)
     bounds = generate_bounds(model)
     medium = get_medium(model)
-    if ARGS.name == "ENGRO2":
-        compartments = generate_compartments(model)
 
-    df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"])
-    df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"])
-
-    df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
-    df_medium = medium.rename(columns = {"reaction": "ReactionID"})
-    df_medium["InMedium"] = True # flag per indicare la presenza nel medium
-
-    merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
-    merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
-    if ARGS.name == "ENGRO2": 
-        merged = merged.merge(compartments, on = "ReactionID", how = "outer")
-    merged = merged.merge(df_medium, on = "ReactionID", how = "left")
-
-    merged["InMedium"] = merged["InMedium"].fillna(False)
-
-    merged = merged.sort_values(by = "InMedium", ascending = False)
-
-    #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data")
-
-    #merged.to_csv(out_file, sep = '\t', index = False)
-
-
-    ####
-
-
-    if not ARGS.out_tabular:
-        raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
-    save_as_tabular_df(merged, ARGS.out_tabular)
-    expected = ARGS.out_tabular
-
-    # verify output exists and non-empty
-    if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
-        raise utils.DataErr(expected, "Output non creato o vuoto")
-
-    print("CustomDataGenerator: completed successfully")
+    # save files out of collection: path coming from xml
+    save_as_csv(rules, ARGS.out_rules, ("ReactionID", "Rule"))
+    save_as_csv(reactions, ARGS.out_reactions, ("ReactionID", "Reaction"))
+    bounds.to_csv(ARGS.out_bounds, sep = '\t')
+    medium.to_csv(ARGS.out_medium, sep = '\t')
 
 if __name__ == '__main__':
     main()
\ No newline at end of file