Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 456:a6e45049c1b9 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Fri, 12 Sep 2025 17:28:45 +0000 |
| parents | c3bb75ce07e6 |
| children |
comparison
equal
deleted
inserted
replaced
| 455:4e2bc80764b6 | 456:a6e45049c1b9 |
|---|---|
| 1 """ | |
| 2 Custom data generator for COBRA models. | |
| 3 | |
| 4 This script loads a COBRA model (built-in or custom), optionally applies | |
| 5 medium and gene nomenclature settings, derives reaction-related metadata | |
| 6 (GPR rules, formulas, bounds, objective coefficients, medium membership, | |
| 7 and compartments for ENGRO2), and writes a tabular summary. | |
| 8 """ | |
| 9 | |
| 1 import os | 10 import os |
| 2 import csv | 11 import csv |
| 3 import cobra | 12 import cobra |
| 4 import pickle | |
| 5 import argparse | 13 import argparse |
| 6 import pandas as pd | 14 import pandas as pd |
| 7 import utils.general_utils as utils | 15 import utils.general_utils as utils |
| 8 import utils.rule_parsing as rulesUtils | 16 from typing import Optional, Tuple, List |
| 9 from typing import Optional, Tuple, Union, List, Dict | |
| 10 import utils.reaction_parsing as reactionUtils | |
| 11 import utils.model_utils as modelUtils | 17 import utils.model_utils as modelUtils |
| 12 import logging | 18 import logging |
| 13 | 19 |
| 14 ARGS : argparse.Namespace | 20 ARGS : argparse.Namespace |
| 15 def process_args(args: List[str] = None) -> argparse.Namespace: | 21 def process_args(args: List[str] = None) -> argparse.Namespace: |
| 48 return parser.parse_args(args) | 54 return parser.parse_args(args) |
| 49 | 55 |
| 50 ################################- INPUT DATA LOADING -################################ | 56 ################################- INPUT DATA LOADING -################################ |
| 51 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: | 57 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: |
| 52 """ | 58 """ |
| 53 Loads a custom model from a file, either in JSON or XML format. | 59 Loads a custom model from a file, either in JSON, XML, MAT, or YML format. |
| 54 | 60 |
| 55 Args: | 61 Args: |
| 56 file_path : The path to the file containing the custom model. | 62 file_path : The path to the file containing the custom model. |
| 57 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. | 63 ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. |
| 58 | 64 |
| 68 return cobra.io.read_sbml_model(file_path.show()) | 74 return cobra.io.read_sbml_model(file_path.show()) |
| 69 | 75 |
| 70 if ext is utils.FileFormat.JSON: | 76 if ext is utils.FileFormat.JSON: |
| 71 return cobra.io.load_json_model(file_path.show()) | 77 return cobra.io.load_json_model(file_path.show()) |
| 72 | 78 |
| 79 if ext is utils.FileFormat.MAT: | |
| 80 return cobra.io.load_matlab_model(file_path.show()) | |
| 81 | |
| 82 if ext is utils.FileFormat.YML: | |
| 83 return cobra.io.load_yaml_model(file_path.show()) | |
| 84 | |
| 73 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 85 except Exception as e: raise utils.DataErr(file_path, e.__str__()) |
| 74 raise utils.DataErr(file_path, | 86 raise utils.DataErr( |
| 75 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 87 file_path, |
| 88 f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported." | |
| 89 ) | |
| 76 | 90 |
| 77 | 91 |
| 78 ###############################- FILE SAVING -################################ | 92 ###############################- FILE SAVING -################################ |
| 79 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 93 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
| 80 """ | 94 """ |
| 113 | 127 |
| 114 for key, value in data.items(): | 128 for key, value in data.items(): |
| 115 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) | 129 writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) |
| 116 | 130 |
| 117 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: | 131 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: |
| 132 """ | |
| 133 Save a pandas DataFrame as a tab-separated file, creating directories as needed. | |
| 134 | |
| 135 Args: | |
| 136 df: The DataFrame to write. | |
| 137 path: Destination file path (will be written as TSV). | |
| 138 | |
| 139 Raises: | |
| 140 DataErr: If writing the output fails for any reason. | |
| 141 | |
| 142 Returns: | |
| 143 None | |
| 144 """ | |
| 118 try: | 145 try: |
| 119 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | 146 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
| 120 df.to_csv(path, sep="\t", index=False) | 147 df.to_csv(path, sep="\t", index=False) |
| 121 except Exception as e: | 148 except Exception as e: |
| 122 raise utils.DataErr(path, f"failed writing tabular output: {e}") | 149 raise utils.DataErr(path, f"failed writing tabular output: {e}") |
| 123 | 150 |
| 124 | 151 |
| 125 ###############################- ENTRY POINT -################################ | 152 ###############################- ENTRY POINT -################################ |
| 126 def main(args:List[str] = None) -> None: | 153 def main(args:List[str] = None) -> None: |
| 127 """ | 154 """ |
| 128 Initializes everything and sets the program in motion based on the fronted input arguments. | 155 Initialize and generate custom data based on the frontend input arguments. |
| 129 | 156 |
| 130 Returns: | 157 Returns: |
| 131 None | 158 None |
| 132 """ | 159 """ |
| 133 # get args from frontend (related xml) | 160 # Parse args from frontend (Galaxy XML) |
| 134 global ARGS | 161 global ARGS |
| 135 ARGS = process_args(args) | 162 ARGS = process_args(args) |
| 136 | 163 |
| 137 | 164 |
| 138 if ARGS.input: | 165 if ARGS.input: |
| 139 # load custom model | 166 # Load a custom model from file |
| 140 model = load_custom_model( | 167 model = load_custom_model( |
| 141 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) | 168 utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) |
| 142 else: | 169 else: |
| 143 # load built-in model | 170 # Load a built-in model |
| 144 | 171 |
| 145 try: | 172 try: |
| 146 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] | 173 model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] |
| 147 except KeyError: | 174 except KeyError: |
| 148 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) | 175 raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) |
| 162 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) | 189 df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) |
| 163 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") | 190 ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") |
| 164 medium = df_mediums[[ARGS.medium_selector]] | 191 medium = df_mediums[[ARGS.medium_selector]] |
| 165 medium = medium[ARGS.medium_selector].to_dict() | 192 medium = medium[ARGS.medium_selector].to_dict() |
| 166 | 193 |
| 167 # Set all reactions to zero in the medium | 194 # Reset all medium reactions lower bound to zero |
| 168 for rxn_id, _ in model.medium.items(): | 195 for rxn_id, _ in model.medium.items(): |
| 169 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) | 196 model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) |
| 170 | 197 |
| 171 # Set medium conditions | 198 # Apply selected medium uptake bounds (negative for uptake) |
| 172 for reaction, value in medium.items(): | 199 for reaction, value in medium.items(): |
| 173 if value is not None: | 200 if value is not None: |
| 174 model.reactions.get_by_id(reaction).lower_bound = -float(value) | 201 model.reactions.get_by_id(reaction).lower_bound = -float(value) |
| 175 | 202 |
| 176 #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | |
| 177 # logging.basicConfig(level=logging.INFO) | |
| 178 # logger = logging.getLogger(__name__) | |
| 179 | |
| 180 #model = modelUtils.translate_model_genes( | |
| 181 # model=model, | |
| 182 # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str}, | |
| 183 # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "), | |
| 184 # source_nomenclature='HGNC_ID', | |
| 185 # logger=logger | |
| 186 #) | |
| 187 #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | |
| 188 | |
| 189 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": | 203 if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": |
| 190 logging.basicConfig(level=logging.INFO) | 204 logging.basicConfig(level=logging.INFO) |
| 191 logger = logging.getLogger(__name__) | 205 logger = logging.getLogger(__name__) |
| 192 | 206 |
| 193 model = modelUtils.translate_model_genes( | 207 model = modelUtils.translate_model_genes( |
| 211 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) | 225 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "GPR"]) |
| 212 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) | 226 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Formula"]) |
| 213 | 227 |
| 214 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 228 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |
| 215 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | 229 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) |
| 216 df_medium["InMedium"] = True # flag per indicare la presenza nel medium | 230 df_medium["InMedium"] = True |
| 217 | 231 |
| 218 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | 232 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") |
| 219 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | 233 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") |
| 220 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") | 234 merged = merged.merge(objective_function, on = "ReactionID", how = "outer") |
| 221 if ARGS.name == "ENGRO2": | 235 if ARGS.name == "ENGRO2": |
| 224 | 238 |
| 225 merged["InMedium"] = merged["InMedium"].fillna(False) | 239 merged["InMedium"] = merged["InMedium"].fillna(False) |
| 226 | 240 |
| 227 merged = merged.sort_values(by = "InMedium", ascending = False) | 241 merged = merged.sort_values(by = "InMedium", ascending = False) |
| 228 | 242 |
| 229 #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") | |
| 230 | |
| 231 #merged.to_csv(out_file, sep = '\t', index = False) | |
| 232 | |
| 233 #### | |
| 234 | |
| 235 if not ARGS.out_tabular: | 243 if not ARGS.out_tabular: |
| 236 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) | 244 raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) |
| 237 save_as_tabular_df(merged, ARGS.out_tabular) | 245 save_as_tabular_df(merged, ARGS.out_tabular) |
| 238 expected = ARGS.out_tabular | 246 expected = ARGS.out_tabular |
| 239 | 247 |
| 240 # verify output exists and non-empty | 248 # verify output exists and non-empty |
| 241 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: | 249 if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: |
| 242 raise utils.DataErr(expected, "Output non creato o vuoto") | 250 raise utils.DataErr(expected, "Output not created or empty") |
| 243 | 251 |
| 244 print("CustomDataGenerator: completed successfully") | 252 print("CustomDataGenerator: completed successfully") |
| 245 | 253 |
| 246 if __name__ == '__main__': | 254 if __name__ == '__main__': |
| 247 main() | 255 main() |
