Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator_beta.py @ 418:919b5b71a61c draft
Uploaded
author | francesco_lapi |
---|---|
date | Tue, 09 Sep 2025 07:36:30 +0000 |
parents | 5086145cfb96 |
children | ed2c1f9e20ba |
comparison
equal
deleted
inserted
replaced
417:e8dd8dca9618 | 418:919b5b71a61c |
---|---|
6 import pandas as pd | 6 import pandas as pd |
7 import utils.general_utils as utils | 7 import utils.general_utils as utils |
8 import utils.rule_parsing as rulesUtils | 8 import utils.rule_parsing as rulesUtils |
9 from typing import Optional, Tuple, Union, List, Dict | 9 from typing import Optional, Tuple, Union, List, Dict |
10 import utils.reaction_parsing as reactionUtils | 10 import utils.reaction_parsing as reactionUtils |
11 import utils.model_utils as modelUtils | |
11 | 12 |
12 ARGS : argparse.Namespace | 13 ARGS : argparse.Namespace |
13 def process_args(args: List[str] = None) -> argparse.Namespace: | 14 def process_args(args: List[str] = None) -> argparse.Namespace: |
14 """ | 15 """ |
15 Parse command-line arguments for CustomDataGenerator. | 16 Parse command-line arguments for CustomDataGenerator. |
69 return cobra.io.load_json_model(file_path.show()) | 70 return cobra.io.load_json_model(file_path.show()) |
70 | 71 |
71 except Exception as e: raise utils.DataErr(file_path, e.__str__()) | 72 except Exception as e: raise utils.DataErr(file_path, e.__str__()) |
72 raise utils.DataErr(file_path, | 73 raise utils.DataErr(file_path, |
73 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") | 74 f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") |
74 | |
75 ################################- DATA GENERATION -################################ | |
76 ReactionId = str | |
77 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | |
78 """ | |
79 Generates a dictionary mapping reaction ids to rules from the model. | |
80 | |
81 Args: | |
82 model : the model to derive data from. | |
83 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | |
84 | |
85 Returns: | |
86 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | |
87 Dict[ReactionId, str] : the generated dictionary of raw rules. | |
88 """ | |
89 # Is the below approach convoluted? yes | |
90 # Ok but is it inefficient? probably | |
91 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | |
92 _ruleGetter = lambda reaction : reaction.gene_reaction_rule | |
93 ruleExtractor = (lambda reaction : | |
94 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | |
95 | |
96 return { | |
97 reaction.id : ruleExtractor(reaction) | |
98 for reaction in model.reactions | |
99 if reaction.gene_reaction_rule } | |
100 | |
101 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | |
102 """ | |
103 Generates a dictionary mapping reaction ids to reaction formulas from the model. | |
104 | |
105 Args: | |
106 model : the model to derive data from. | |
107 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | |
108 | |
109 Returns: | |
110 Dict[ReactionId, str] : the generated dictionary. | |
111 """ | |
112 | |
113 unparsedReactions = { | |
114 reaction.id : reaction.reaction | |
115 for reaction in model.reactions | |
116 if reaction.reaction | |
117 } | |
118 | |
119 if not asParsed: return unparsedReactions | |
120 | |
121 return reactionUtils.create_reaction_dict(unparsedReactions) | |
122 | |
123 def get_medium(model:cobra.Model) -> pd.DataFrame: | |
124 trueMedium=[] | |
125 for r in model.reactions: | |
126 positiveCoeff=0 | |
127 for m in r.metabolites: | |
128 if r.get_coefficient(m.id)>0: | |
129 positiveCoeff=1; | |
130 if (positiveCoeff==0 and r.lower_bound<0): | |
131 trueMedium.append(r.id) | |
132 | |
133 df_medium = pd.DataFrame() | |
134 df_medium["reaction"] = trueMedium | |
135 return df_medium | |
136 | |
137 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | |
138 | |
139 rxns = [] | |
140 for reaction in model.reactions: | |
141 rxns.append(reaction.id) | |
142 | |
143 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | |
144 | |
145 for reaction in model.reactions: | |
146 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | |
147 return bounds | |
148 | |
149 | |
150 | |
151 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | |
152 """ | |
153 Generates a DataFrame containing compartment information for each reaction. | |
154 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | |
155 | |
156 Args: | |
157 model: the COBRA model to extract compartment data from. | |
158 | |
159 Returns: | |
160 pd.DataFrame: DataFrame with ReactionID and compartment columns | |
161 """ | |
162 pathway_data = [] | |
163 | |
164 # First pass: determine the maximum number of pathways any reaction has | |
165 max_pathways = 0 | |
166 reaction_pathways = {} | |
167 | |
168 for reaction in model.reactions: | |
169 # Get unique pathways from all metabolites in the reaction | |
170 if type(reaction.annotation['pathways']) == list: | |
171 reaction_pathways[reaction.id] = reaction.annotation['pathways'] | |
172 max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) | |
173 else: | |
174 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | |
175 | |
176 # Create column names for pathways | |
177 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | |
178 | |
179 # Second pass: create the data | |
180 for reaction_id, pathways in reaction_pathways.items(): | |
181 row = {"ReactionID": reaction_id} | |
182 | |
183 # Fill pathway columns | |
184 for i in range(max_pathways): | |
185 col_name = pathway_columns[i] | |
186 if i < len(pathways): | |
187 row[col_name] = pathways[i] | |
188 else: | |
189 row[col_name] = None # or "" if you prefer empty strings | |
190 | |
191 pathway_data.append(row) | |
192 | |
193 return pd.DataFrame(pathway_data) | |
194 | 75 |
195 | 76 |
196 ###############################- FILE SAVING -################################ | 77 ###############################- FILE SAVING -################################ |
197 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 78 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
198 """ | 79 """ |
294 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": | 175 if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": |
295 | 176 |
296 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) | 177 model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) |
297 | 178 |
298 # generate data | 179 # generate data |
299 rules = generate_rules(model, asParsed = False) | 180 rules = modelUtils.generate_rules(model, asParsed = False) |
300 reactions = generate_reactions(model, asParsed = False) | 181 reactions = modelUtils.generate_reactions(model, asParsed = False) |
301 bounds = generate_bounds(model) | 182 bounds = modelUtils.generate_bounds(model) |
302 medium = get_medium(model) | 183 medium = modelUtils.get_medium(model) |
303 if ARGS.name == "ENGRO2": | 184 if ARGS.name == "ENGRO2": |
304 compartments = generate_compartments(model) | 185 compartments = modelUtils.generate_compartments(model) |
305 | 186 |
306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) | 187 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) |
307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) | 188 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) |
308 | 189 |
309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 190 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |