| 
457
 | 
     1 """
 | 
| 
 | 
     2 Scripts to generate a tabular file of a metabolic model (built-in or custom).
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 This script loads a COBRA model (built-in or custom), optionally applies
 | 
| 
 | 
     5 medium and gene nomenclature settings, derives reaction-related metadata
 | 
| 
 | 
     6 (GPR rules, formulas, bounds, objective coefficients, medium membership,
 | 
| 
 | 
     7 and compartments for ENGRO2), and writes a tabular summary.
 | 
| 
 | 
     8 """
 | 
| 
 | 
     9 
 | 
| 
 | 
    10 import os
 | 
| 
 | 
    11 import csv
 | 
| 
 | 
    12 import cobra
 | 
| 
 | 
    13 import argparse
 | 
| 
 | 
    14 import pandas as pd
 | 
| 
 | 
    15 import utils.general_utils as utils
 | 
| 
 | 
    16 from typing import Optional, Tuple, List
 | 
| 
 | 
    17 import utils.model_utils as modelUtils
 | 
| 
 | 
    18 import logging
 | 
| 
 | 
    19 
 | 
| 
 | 
    20 ARGS : argparse.Namespace
 | 
| 
 | 
    21 def process_args(args: List[str] = None) -> argparse.Namespace:
 | 
| 
 | 
    22     """
 | 
| 
 | 
    23     Parse command-line arguments for metabolic_model_setting.
 | 
| 
 | 
    24     """
 | 
| 
 | 
    25 
 | 
| 
 | 
    26     parser = argparse.ArgumentParser(
 | 
| 
 | 
    27         usage="%(prog)s [options]",
 | 
| 
 | 
    28         description="Generate custom data from a given model"
 | 
| 
 | 
    29     )
 | 
| 
 | 
    30 
 | 
| 
 | 
    31     parser.add_argument("--out_log", type=str, required=True,
 | 
| 
 | 
    32                         help="Output log file")
 | 
| 
 | 
    33 
 | 
| 
 | 
    34     parser.add_argument("--model", type=str,
 | 
| 
 | 
    35                         help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)")
 | 
| 
 | 
    36     parser.add_argument("--input", type=str,
 | 
| 
 | 
    37                         help="Custom model file (JSON or XML)")
 | 
| 
 | 
    38     parser.add_argument("--name", type=str, required=True,
 | 
| 
 | 
    39                         help="Model name (default or custom)")
 | 
| 
 | 
    40     
 | 
| 
 | 
    41     parser.add_argument("--medium_selector", type=str, required=True,
 | 
| 
 | 
    42                         help="Medium selection option")
 | 
| 
 | 
    43 
 | 
| 
 | 
    44     parser.add_argument("--gene_format", type=str, default="Default",
 | 
| 
 | 
    45                         help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ")
 | 
| 
 | 
    46     
 | 
| 
 | 
    47     parser.add_argument("--out_tabular", type=str,
 | 
| 
 | 
    48                         help="Output file for the merged dataset (CSV or XLSX)")
 | 
| 
 | 
    49     
 | 
| 
 | 
    50     parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__),
 | 
| 
 | 
    51                         help="Tool directory (passed from Galaxy as $__tool_directory__)")
 | 
| 
 | 
    52 
 | 
| 
 | 
    53 
 | 
| 
 | 
    54     return parser.parse_args(args)
 | 
| 
 | 
    55 
 | 
| 
 | 
    56 ################################- INPUT DATA LOADING -################################
 | 
| 
 | 
    57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
 | 
| 
 | 
    58     """
 | 
| 
 | 
    59     Loads a custom model from a file, either in JSON, XML, MAT, or YML format.
 | 
| 
 | 
    60 
 | 
| 
 | 
    61     Args:
 | 
| 
 | 
    62         file_path : The path to the file containing the custom model.
 | 
| 
 | 
    63         ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.
 | 
| 
 | 
    64 
 | 
| 
 | 
    65     Raises:
 | 
| 
 | 
    66         DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
 | 
| 
 | 
    67     
 | 
| 
 | 
    68     Returns:
 | 
| 
 | 
    69         cobra.Model : the model, if successfully opened.
 | 
| 
 | 
    70     """
 | 
| 
 | 
    71     ext = ext if ext else file_path.ext
 | 
| 
 | 
    72     try:
 | 
| 
 | 
    73         if ext is utils.FileFormat.XML:
 | 
| 
 | 
    74             return cobra.io.read_sbml_model(file_path.show())
 | 
| 
 | 
    75         
 | 
| 
 | 
    76         if ext is utils.FileFormat.JSON:
 | 
| 
 | 
    77             return cobra.io.load_json_model(file_path.show())
 | 
| 
 | 
    78 
 | 
| 
 | 
    79         if ext is utils.FileFormat.MAT:
 | 
| 
 | 
    80             return cobra.io.load_matlab_model(file_path.show())
 | 
| 
 | 
    81 
 | 
| 
 | 
    82         if ext is utils.FileFormat.YML:
 | 
| 
 | 
    83             return cobra.io.load_yaml_model(file_path.show())
 | 
| 
 | 
    84 
 | 
| 
 | 
    85     except Exception as e: raise utils.DataErr(file_path, e.__str__())
 | 
| 
 | 
    86     raise utils.DataErr(
 | 
| 
 | 
    87         file_path,
 | 
| 
 | 
    88         f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported."
 | 
| 
 | 
    89     )
 | 
| 
 | 
    90 
 | 
| 
 | 
    91 
 | 
| 
 | 
    92 ###############################- FILE SAVING -################################
 | 
| 
 | 
    93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None:
 | 
| 
 | 
    94     """
 | 
| 
 | 
    95     Saves any dictionary-shaped data in a .csv file created at the given file_path as FilePath.
 | 
| 
 | 
    96 
 | 
| 
 | 
    97     Args:
 | 
| 
 | 
    98         data : the data to be written to the file.
 | 
| 
 | 
    99         file_path : the path to the .csv file.
 | 
| 
 | 
   100         fieldNames : the names of the fields (columns) in the .csv file.
 | 
| 
 | 
   101     
 | 
| 
 | 
   102     Returns:
 | 
| 
 | 
   103         None
 | 
| 
 | 
   104     """
 | 
| 
 | 
   105     with open(file_path.show(), 'w', newline='') as csvfile:
 | 
| 
 | 
   106         writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab")
 | 
| 
 | 
   107         writer.writeheader()
 | 
| 
 | 
   108 
 | 
| 
 | 
   109         for key, value in data.items():
 | 
| 
 | 
   110             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 | 
| 
 | 
   111 
 | 
| 
 | 
   112 def save_as_csv(data :dict, file_path :str, fieldNames :Tuple[str, str]) -> None:
 | 
| 
 | 
   113     """
 | 
| 
 | 
   114     Saves any dictionary-shaped data in a .csv file created at the given file_path as string.
 | 
| 
 | 
   115 
 | 
| 
 | 
   116     Args:
 | 
| 
 | 
   117         data : the data to be written to the file.
 | 
| 
 | 
   118         file_path : the path to the .csv file.
 | 
| 
 | 
   119         fieldNames : the names of the fields (columns) in the .csv file.
 | 
| 
 | 
   120     
 | 
| 
 | 
   121     Returns:
 | 
| 
 | 
   122         None
 | 
| 
 | 
   123     """
 | 
| 
 | 
   124     with open(file_path, 'w', newline='') as csvfile:
 | 
| 
 | 
   125         writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab")
 | 
| 
 | 
   126         writer.writeheader()
 | 
| 
 | 
   127 
 | 
| 
 | 
   128         for key, value in data.items():
 | 
| 
 | 
   129             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 | 
| 
 | 
   130 
 | 
| 
 | 
   131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
 | 
| 
 | 
   132     """
 | 
| 
 | 
   133     Save a pandas DataFrame as a tab-separated file, creating directories as needed.
 | 
| 
 | 
   134 
 | 
| 
 | 
   135     Args:
 | 
| 
 | 
   136         df: The DataFrame to write.
 | 
| 
 | 
   137         path: Destination file path (will be written as TSV).
 | 
| 
 | 
   138 
 | 
| 
 | 
   139     Raises:
 | 
| 
 | 
   140         DataErr: If writing the output fails for any reason.
 | 
| 
 | 
   141 
 | 
| 
 | 
   142     Returns:
 | 
| 
 | 
   143         None
 | 
| 
 | 
   144     """
 | 
| 
 | 
   145     try:
 | 
| 
 | 
   146         os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
 | 
| 
 | 
   147         df.to_csv(path, sep="\t", index=False)
 | 
| 
 | 
   148     except Exception as e:
 | 
| 
 | 
   149         raise utils.DataErr(path, f"failed writing tabular output: {e}")
 | 
| 
 | 
   150 
 | 
| 
 | 
   151 
 | 
| 
 | 
   152 ###############################- ENTRY POINT -################################
 | 
| 
 | 
   153 def main(args:List[str] = None) -> None:
 | 
| 
 | 
   154     """
 | 
| 
 | 
   155     Initialize and generate custom data based on the frontend input arguments.
 | 
| 
 | 
   156     
 | 
| 
 | 
   157     Returns:
 | 
| 
 | 
   158         None
 | 
| 
 | 
   159     """
 | 
| 
 | 
   160     # Parse args from frontend (Galaxy XML)
 | 
| 
 | 
   161     global ARGS
 | 
| 
 | 
   162     ARGS = process_args(args)
 | 
| 
 | 
   163 
 | 
| 
 | 
   164 
 | 
| 
 | 
   165     if ARGS.input:
 | 
| 
 | 
   166         # Load a custom model from file
 | 
| 
 | 
   167         model = load_custom_model(
 | 
| 
 | 
   168             utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
 | 
| 
 | 
   169     else:
 | 
| 
 | 
   170         # Load a built-in model
 | 
| 
 | 
   171 
 | 
| 
 | 
   172         try:
 | 
| 
 | 
   173             model_enum = utils.Model[ARGS.model]  # e.g., Model['ENGRO2']
 | 
| 
 | 
   174         except KeyError:
 | 
| 
 | 
   175             raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model)
 | 
| 
 | 
   176 
 | 
| 
 | 
   177         # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models)
 | 
| 
 | 
   178         try:
 | 
| 
 | 
   179             model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir)
 | 
| 
 | 
   180         except Exception as e:
 | 
| 
 | 
   181             # Wrap/normalize load errors as DataErr for consistency
 | 
| 
 | 
   182             raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}")
 | 
| 
 | 
   183 
 | 
| 
 | 
   184     # Determine final model name: explicit --name overrides, otherwise use the model id
 | 
| 
 | 
   185     
 | 
| 
 | 
   186     model_name = ARGS.name if ARGS.name else ARGS.model
 | 
| 
 | 
   187     
 | 
| 
 | 
   188     if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default":
 | 
| 
 | 
   189         df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
 | 
| 
 | 
   190         ARGS.medium_selector = ARGS.medium_selector.replace("_", " ")
 | 
| 
 | 
   191         medium = df_mediums[[ARGS.medium_selector]]
 | 
| 
 | 
   192         medium = medium[ARGS.medium_selector].to_dict()
 | 
| 
 | 
   193 
 | 
| 
 | 
   194         # Reset all medium reactions lower bound to zero
 | 
| 
 | 
   195         for rxn_id, _ in model.medium.items():
 | 
| 
 | 
   196             model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
 | 
| 
 | 
   197         
 | 
| 
 | 
   198         # Apply selected medium uptake bounds (negative for uptake)
 | 
| 
 | 
   199         for reaction, value in medium.items():
 | 
| 
 | 
   200             if value is not None:
 | 
| 
 | 
   201                 model.reactions.get_by_id(reaction).lower_bound = -float(value)
 | 
| 
 | 
   202 
 | 
| 
 | 
   203     if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default":
 | 
| 
 | 
   204         logging.basicConfig(level=logging.INFO)
 | 
| 
 | 
   205         logger = logging.getLogger(__name__)
 | 
| 
 | 
   206 
 | 
| 
 | 
   207         model = modelUtils.translate_model_genes(
 | 
| 
 | 
   208             model=model,
 | 
| 
 | 
   209             mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv", dtype={'entrez_id': str}),
 | 
| 
 | 
   210             target_nomenclature=ARGS.gene_format,
 | 
| 
 | 
   211             source_nomenclature='HGNC_symbol',
 | 
| 
 | 
   212             logger=logger
 | 
| 
 | 
   213         )
 | 
| 
 | 
   214 
 | 
| 
 | 
   215     # generate data
 | 
| 
 | 
   216     rules = modelUtils.generate_rules(model, asParsed = False)
 | 
| 
 | 
   217     reactions = modelUtils.generate_reactions(model, asParsed = False)
 | 
| 
 | 
   218     bounds = modelUtils.generate_bounds(model)
 | 
| 
 | 
   219     medium = modelUtils.get_medium(model)
 | 
| 
 | 
   220     objective_function = modelUtils.extract_objective_coefficients(model)
 | 
| 
 | 
   221     
 | 
| 
 | 
   222     if ARGS.name == "ENGRO2":
 | 
| 
 | 
   223         compartments = modelUtils.generate_compartments(model)
 | 
| 
 | 
   224 
 | 
| 
 | 
   225     df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"])
 | 
| 
 | 
   226     df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"])
 | 
| 
 | 
   227 
 | 
| 
 | 
   228     df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
 | 
| 
 | 
   229     df_medium = medium.rename(columns = {"reaction": "ReactionID"})
 | 
| 
 | 
   230     df_medium["InMedium"] = True
 | 
| 
 | 
   231 
 | 
| 
 | 
   232     merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
 | 
| 
 | 
   233     merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
 | 
| 
 | 
   234     merged = merged.merge(objective_function, on = "ReactionID", how = "outer")
 | 
| 
 | 
   235     if ARGS.name == "ENGRO2": 
 | 
| 
 | 
   236         merged = merged.merge(compartments, on = "ReactionID", how = "outer")
 | 
| 
 | 
   237     merged = merged.merge(df_medium, on = "ReactionID", how = "left")
 | 
| 
 | 
   238 
 | 
| 
 | 
   239     merged["InMedium"] = merged["InMedium"].fillna(False)
 | 
| 
 | 
   240 
 | 
| 
 | 
   241     merged = merged.sort_values(by = "InMedium", ascending = False)
 | 
| 
 | 
   242 
 | 
| 
 | 
   243     if not ARGS.out_tabular:
 | 
| 
 | 
   244         raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
 | 
| 
 | 
   245     save_as_tabular_df(merged, ARGS.out_tabular)
 | 
| 
 | 
   246     expected = ARGS.out_tabular
 | 
| 
 | 
   247 
 | 
| 
 | 
   248     # verify output exists and non-empty
 | 
| 
 | 
   249     if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
 | 
| 
 | 
   250         raise utils.DataErr(expected, "Output not created or empty")
 | 
| 
 | 
   251 
 | 
| 
 | 
   252     print("Metabolic_model_setting: completed successfully")
 | 
| 
 | 
   253 
 | 
| 
 | 
   254 if __name__ == '__main__':
 | 
| 
 | 
   255 
 | 
| 
 | 
   256     main()
 |