Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 418:919b5b71a61c draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Tue, 09 Sep 2025 07:36:30 +0000 |
| parents | 5086145cfb96 |
| children | ed2c1f9e20ba |
comparison
equal
deleted
inserted
replaced
| 417:e8dd8dca9618 | 418:919b5b71a61c |
|---|---|
| 6 import pandas as pd | 6 import pandas as pd |
| 7 import utils.general_utils as utils | 7 import utils.general_utils as utils |
| 8 import utils.rule_parsing as rulesUtils | 8 import utils.rule_parsing as rulesUtils |
| 9 from typing import Optional, Tuple, Union, List, Dict | 9 from typing import Optional, Tuple, Union, List, Dict |
| 10 import utils.reaction_parsing as reactionUtils | 10 import utils.reaction_parsing as reactionUtils |
| 11 import utils.model_utils as modelUtils | |
| 11 | 12 |
| 12 ARGS : argparse.Namespace | 13 ARGS : argparse.Namespace |
| 13 def process_args(args: List[str] = None) -> argparse.Namespace: | 14 def process_args(args: List[str] = None) -> argparse.Namespace: |
| 14 """ | 15 """ |
| 15 Parse command-line arguments for CustomDataGenerator. | 16 Parse command-line arguments for CustomDataGenerator. |
| 69 return cobra.io.load_json_model(file_path.show()) | 70 return cobra.io.load_json_model(file_path.show()) |
| 70 | 71 |
| 71 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 72 except Exception as e: raise utils.DataErr(file_path, e.__str__()) |
| 72 raise utils.DataErr(file_path, | 73 raise utils.DataErr(file_path, |
| 73 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 74 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") |
| 74 | |
| 75 ################################- DATA GENERATION -################################ | |
| 76 ReactionId = str | |
| 77 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | |
| 78 """ | |
| 79 Generates a dictionary mapping reaction ids to rules from the model. | |
| 80 | |
| 81 Args: | |
| 82 model : the model to derive data from. | |
| 83 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | |
| 84 | |
| 85 Returns: | |
| 86 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | |
| 87 Dict[ReactionId, str] : the generated dictionary of raw rules. | |
| 88 """ | |
| 89 # Is the below approach convoluted? yes | |
| 90 # Ok but is it inefficient? probably | |
| 91 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
| 92 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | |
| 93 ruleExtractor = (lambda reaction : | |
| 94 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | |
| 95 | |
| 96 return { | |
| 97 reaction.id : ruleExtractor(reaction) | |
| 98 for reaction in model.reactions | |
| 99 if reaction.gene_reaction_rule } | |
| 100 | |
| 101 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | |
| 102 """ | |
| 103 Generates a dictionary mapping reaction ids to reaction formulas from the model. | |
| 104 | |
| 105 Args: | |
| 106 model : the model to derive data from. | |
| 107 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | |
| 108 | |
| 109 Returns: | |
| 110 Dict[ReactionId, str] : the generated dictionary. | |
| 111 """ | |
| 112 | |
| 113 unparsedReactions = { | |
| 114 reaction.id : reaction.reaction | |
| 115 for reaction in model.reactions | |
| 116 if reaction.reaction | |
| 117 } | |
| 118 | |
| 119 if not asParsed: return unparsedReactions | |
| 120 | |
| 121 return reactionUtils.create_reaction_dict(unparsedReactions) | |
| 122 | |
| 123 def get_medium(model:cobra.Model) -> pd.DataFrame: | |
| 124 trueMedium=[] | |
| 125 for r in model.reactions: | |
| 126 positiveCoeff=0 | |
| 127 for m in r.metabolites: | |
| 128 if r.get_coefficient(m.id)>0: | |
| 129 positiveCoeff=1; | |
| 130 if (positiveCoeff==0 and r.lower_bound<0): | |
| 131 trueMedium.append(r.id) | |
| 132 | |
| 133 df_medium = pd.DataFrame() | |
| 134 df_medium["reaction"] = trueMedium | |
| 135 return df_medium | |
| 136 | |
| 137 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | |
| 138 | |
| 139 rxns = [] | |
| 140 for reaction in model.reactions: | |
| 141 rxns.append(reaction.id) | |
| 142 | |
| 143 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | |
| 144 | |
| 145 for reaction in model.reactions: | |
| 146 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | |
| 147 return bounds | |
| 148 | |
| 149 | |
| 150 | |
| 151 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | |
| 152 """ | |
| 153 Generates a DataFrame containing compartment information for each reaction. | |
| 154 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | |
| 155 | |
| 156 Args: | |
| 157 model: the COBRA model to extract compartment data from. | |
| 158 | |
| 159 Returns: | |
| 160 pd.DataFrame: DataFrame with ReactionID and compartment columns | |
| 161 """ | |
| 162 pathway_data = [] | |
| 163 | |
| 164 # First pass: determine the maximum number of pathways any reaction has | |
| 165 max_pathways = 0 | |
| 166 reaction_pathways = {} | |
| 167 | |
| 168 for reaction in model.reactions: | |
| 169 # Get unique pathways from all metabolites in the reaction | |
| 170 if type(reaction.annotation['pathways']) == list: | |
| 171 reaction_pathways[reaction.id] = reaction.annotation['pathways'] | |
| 172 max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) | |
| 173 else: | |
| 174 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | |
| 175 | |
| 176 # Create column names for pathways | |
| 177 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | |
| 178 | |
| 179 # Second pass: create the data | |
| 180 for reaction_id, pathways in reaction_pathways.items(): | |
| 181 row = {"ReactionID": reaction_id} | |
| 182 | |
| 183 # Fill pathway columns | |
| 184 for i in range(max_pathways): | |
| 185 col_name = pathway_columns[i] | |
| 186 if i < len(pathways): | |
| 187 row[col_name] = pathways[i] | |
| 188 else: | |
| 189 row[col_name] = None # or "" if you prefer empty strings | |
| 190 | |
| 191 pathway_data.append(row) | |
| 192 | |
| 193 return pd.DataFrame(pathway_data) | |
| 194 | 75 |
| 195 | 76 |
| 196 ###############################- FILE SAVING -################################ | 77 ###############################- FILE SAVING -################################ |
| 197 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 78 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
| 198 """ | 79 """ |
| 294 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | 175 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": |
| 295 | 176 |
| 296 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | 177 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) |
| 297 | 178 |
| 298 # generate data | 179 # generate data |
| 299 rules = generate_rules(model, asParsed = False) | 180 rules = modelUtils.generate_rules(model, asParsed = False) |
| 300 reactions = generate_reactions(model, asParsed = False) | 181 reactions = modelUtils.generate_reactions(model, asParsed = False) |
| 301 bounds = generate_bounds(model) | 182 bounds = modelUtils.generate_bounds(model) |
| 302 medium = get_medium(model) | 183 medium = modelUtils.get_medium(model) |
| 303 if ARGS.name == "ENGRO2": | 184 if ARGS.name == "ENGRO2": |
| 304 compartments = generate_compartments(model) | 185 compartments = modelUtils.generate_compartments(model) |
| 305 | 186 |
| 306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) | 187 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) |
| 307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) | 188 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) |
| 308 | 189 |
| 309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 190 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |
