Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 456:a6e45049c1b9 draft
Uploaded
| author | francesco_lapi | 
|---|---|
| date | Fri, 12 Sep 2025 17:28:45 +0000 | 
| parents | c3bb75ce07e6 | 
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 455:4e2bc80764b6 | 456:a6e45049c1b9 | 
|---|---|
| 1 """ | |
| 2 Custom data generator for COBRA models. | |
| 3 | |
| 4 This script loads a COBRA model (built-in or custom), optionally applies | |
| 5 medium and gene nomenclature settings, derives reaction-related metadata | |
| 6 (GPR rules, formulas, bounds, objective coefficients, medium membership, | |
| 7 and compartments for ENGRO2), and writes a tabular summary. | |
| 8 """ | |
| 9 | |
| 1 import os | 10 import os | 
| 2 import csv | 11 import csv | 
| 3 import cobra | 12 import cobra | 
| 4 import pickle | |
| 5 import argparse | 13 import argparse | 
| 6 import pandas as pd | 14 import pandas as pd | 
| 7 import utils.general_utils as utils | 15 import utils.general_utils as utils | 
| 8 import utils.rule_parsing as rulesUtils | 16 from typing import Optional, Tuple, List | 
| 9 from typing import Optional, Tuple, Union, List, Dict | |
| 10 import utils.reaction_parsing as reactionUtils | |
| 11 import utils.model_utils as modelUtils | 17 import utils.model_utils as modelUtils | 
| 12 import logging | 18 import logging | 
| 13 | 19 | 
| 14 ARGS : argparse.Namespace | 20 ARGS : argparse.Namespace | 
| 15 def process_args(args: List[str] = None) -> argparse.Namespace: | 21 def process_args(args: List[str] = None) -> argparse.Namespace: | 
| 48 return parser.parse_args(args) | 54 return parser.parse_args(args) | 
| 49 | 55 | 
| 50 ################################- INPUT DATA LOADING -################################ | 56 ################################- INPUT DATA LOADING -################################ | 
| 51 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: | 57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: | 
| 52 """ | 58 """ | 
| 53 Loads a custom model from a file, either in JSON or XML format. | 59 Loads a custom model from a file, either in JSON, XML, MAT, or YML format. | 
| 54 | 60 | 
| 55 Args: | 61 Args: | 
| 56 file_path : The path to the file containing the custom model. | 62 file_path : The path to the file containing the custom model. | 
| 57 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. | 63 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. | 
| 58 | 64 | 
| 68 return cobra.io.read_sbml_model(file_path.show()) | 74 return cobra.io.read_sbml_model(file_path.show()) | 
| 69 | 75 | 
| 70 if ext is utils.FileFormat.JSON: | 76 if ext is utils.FileFormat.JSON: | 
| 71 return cobra.io.load_json_model(file_path.show()) | 77 return cobra.io.load_json_model(file_path.show()) | 
| 72 | 78 | 
| 79 if ext is utils.FileFormat.MAT: | |
| 80 return cobra.io.load_matlab_model(file_path.show()) | |
| 81 | |
| 82 if ext is utils.FileFormat.YML: | |
| 83 return cobra.io.load_yaml_model(file_path.show()) | |
| 84 | |
| 73 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 85 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 
| 74 raise utils.DataErr(file_path, | 86 raise utils.DataErr( | 
| 75 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 87 file_path, | 
| 88 f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported." | |
| 89 ) | |
| 76 | 90 | 
| 77 | 91 | 
| 78 ###############################- FILE SAVING -################################ | 92 ###############################- FILE SAVING -################################ | 
| 79 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 
| 80 """ | 94 """ | 
| 113 | 127 | 
| 114 for key, value in data.items(): | 128 for key, value in data.items(): | 
| 115 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | 129 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | 
| 116 | 130 | 
| 117 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: | 131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: | 
| 132 """ | |
| 133 Save a pandas DataFrame as a tab-separated file, creating directories as needed. | |
| 134 | |
| 135 Args: | |
| 136 df: The DataFrame to write. | |
| 137 path: Destination file path (will be written as TSV). | |
| 138 | |
| 139 Raises: | |
| 140 DataErr: If writing the output fails for any reason. | |
| 141 | |
| 142 Returns: | |
| 143 None | |
| 144 """ | |
| 118 try: | 145 try: | 
| 119 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | 146 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | 
| 120 df.to_csv(path, sep="\t", index=False) | 147 df.to_csv(path, sep="\t", index=False) | 
| 121 except Exception as e: | 148 except Exception as e: | 
| 122 raise utils.DataErr(path, f"failed writing tabular output: {e}") | 149 raise utils.DataErr(path, f"failed writing tabular output: {e}") | 
| 123 | 150 | 
| 124 | 151 | 
| 125 ###############################- ENTRY POINT -################################ | 152 ###############################- ENTRY POINT -################################ | 
| 126 def main(args:List[str] = None) -> None: | 153 def main(args:List[str] = None) -> None: | 
| 127 """ | 154 """ | 
| 128 Initializes everything and sets the program in motion based on the fronted input arguments. | 155 Initialize and generate custom data based on the frontend input arguments. | 
| 129 | 156 | 
| 130 Returns: | 157 Returns: | 
| 131 None | 158 None | 
| 132 """ | 159 """ | 
| 133 # get args from frontend (related xml) | 160 # Parse args from frontend (Galaxy XML) | 
| 134 global ARGS | 161 global ARGS | 
| 135 ARGS = process_args(args) | 162 ARGS = process_args(args) | 
| 136 | 163 | 
| 137 | 164 | 
| 138 if ARGS.input: | 165 if ARGS.input: | 
| 139 # load custom model | 166 # Load a custom model from file | 
| 140 model = load_custom_model( | 167 model = load_custom_model( | 
| 141 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) | 168 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) | 
| 142 else: | 169 else: | 
| 143 # load built-in model | 170 # Load a built-in model | 
| 144 | 171 | 
| 145 try: | 172 try: | 
| 146 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] | 173 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] | 
| 147 except KeyError: | 174 except KeyError: | 
| 148 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) | 175 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) | 
| 162 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) | 189 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) | 
| 163 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") | 190 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") | 
| 164 medium = df_mediums[[ARGS.medium_selector]] | 191 medium = df_mediums[[ARGS.medium_selector]] | 
| 165 medium = medium[ARGS.medium_selector].to_dict() | 192 medium = medium[ARGS.medium_selector].to_dict() | 
| 166 | 193 | 
| 167 # Set all reactions to zero in the medium | 194 # Reset all medium reactions lower bound to zero | 
| 168 for rxn_id, _ in model.medium.items(): | 195 for rxn_id, _ in model.medium.items(): | 
| 169 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) | 196 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) | 
| 170 | 197 | 
| 171 # Set medium conditions | 198 # Apply selected medium uptake bounds (negative for uptake) | 
| 172 for reaction, value in medium.items(): | 199 for reaction, value in medium.items(): | 
| 173 if value is not None: | 200 if value is not None: | 
| 174 model.reactions.get_by_id(reaction).lower_bound = -float(value) | 201 model.reactions.get_by_id(reaction).lower_bound = -float(value) | 
| 175 | 202 | 
| 176 #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | |
| 177 # logging.basicConfig(level=logging.INFO) | |
| 178 # logger = logging.getLogger(__name__) | |
| 179 | |
| 180 #model = modelUtils.translate_model_genes( | |
| 181 # model=model, | |
| 182 # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str}, | |
| 183 # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "), | |
| 184 # source_nomenclature='HGNC_ID', | |
| 185 # logger=logger | |
| 186 #) | |
| 187 #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | |
| 188 | |
| 189 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": | 203 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": | 
| 190 logging.basicConfig(level=logging.INFO) | 204 logging.basicConfig(level=logging.INFO) | 
| 191 logger = logging.getLogger(__name__) | 205 logger = logging.getLogger(__name__) | 
| 192 | 206 | 
| 193 model = modelUtils.translate_model_genes( | 207 model = modelUtils.translate_model_genes( | 
| 211 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) | 225 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) | 
| 212 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) | 226 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) | 
| 213 | 227 | 
| 214 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 228 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 
| 215 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | 229 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | 
| 216 df_medium["InMedium"] = True # flag per indicare la presenza nel medium | 230 df_medium["InMedium"] = True | 
| 217 | 231 | 
| 218 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | 232 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | 
| 219 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | 233 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | 
| 220 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") | 234 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") | 
| 221 if ARGS.name == "ENGRO2": | 235 if ARGS.name == "ENGRO2": | 
| 224 | 238 | 
| 225 merged["InMedium"] = merged["InMedium"].fillna(False) | 239 merged["InMedium"] = merged["InMedium"].fillna(False) | 
| 226 | 240 | 
| 227 merged = merged.sort_values(by = "InMedium", ascending = False) | 241 merged = merged.sort_values(by = "InMedium", ascending = False) | 
| 228 | 242 | 
| 229 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") | |
| 230 | |
| 231 #merged.to_csv(out_file, sep = '\t', index = False) | |
| 232 | |
| 233 #### | |
| 234 | |
| 235 if not ARGS.out_tabular: | 243 if not ARGS.out_tabular: | 
| 236 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | 244 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | 
| 237 save_as_tabular_df(merged, ARGS.out_tabular) | 245 save_as_tabular_df(merged, ARGS.out_tabular) | 
| 238 expected = ARGS.out_tabular | 246 expected = ARGS.out_tabular | 
| 239 | 247 | 
| 240 # verify output exists and non-empty | 248 # verify output exists and non-empty | 
| 241 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: | 249 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: | 
| 242 raise utils.DataErr(expected, "Output non creato o vuoto") | 250 raise utils.DataErr(expected, "Output not created or empty") | 
| 243 | 251 | 
| 244 print("CustomDataGenerator: completed successfully") | 252 print("CustomDataGenerator: completed successfully") | 
| 245 | 253 | 
| 246 if __name__ == '__main__': | 254 if __name__ == '__main__': | 
| 247 main() | 255 main() | 
