Mercurial > repos > bimib > cobraxy
comparison COBRAxy/custom_data_generator.py @ 404:08f1ff359397 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Mon, 08 Sep 2025 13:50:08 +0000 |
| parents | 05092b0cfca0 |
| children | 187cee1a00e2 |
comparison
equal
deleted
inserted
replaced
| 403:05092b0cfca0 | 404:08f1ff359397 |
|---|---|
| 157 model: the COBRA model to extract compartment data from. | 157 model: the COBRA model to extract compartment data from. |
| 158 | 158 |
| 159 Returns: | 159 Returns: |
| 160 pd.DataFrame: DataFrame with ReactionID and compartment columns | 160 pd.DataFrame: DataFrame with ReactionID and compartment columns |
| 161 """ | 161 """ |
| 162 compartment_data = [] | 162 pathway_data = [] |
| 163 | 163 |
| 164 # First pass: determine the maximum number of compartments any reaction has | 164 # First pass: determine the maximum number of pathways any reaction has |
| 165 max_compartments = 0 | 165 max_pathways = 0 |
| 166 reaction_compartments = {} | 166 reaction_pathways = {} |
| 167 | 167 |
| 168 for reaction in model.reactions: | 168 for reaction in model.reactions: |
| 169 # Get unique compartments from all metabolites in the reaction | 169 # Get unique pathways from all metabolites in the reaction |
| 170 if type(reaction.annotation['pathways']) == list: | 170 if type(reaction.annotation['pathways']) == list: |
| 171 reaction_compartments[reaction.id] = reaction.annotation['pathways'] | 171 reaction_pathways[reaction.id] = reaction.annotation['pathways'] |
| 172 max_compartments = max(max_compartments, len(reaction.annotation['pathways'])) | 172 max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) |
| 173 else: | 173 else: |
| 174 reaction_compartments[reaction.id] = [reaction.annotation['pathways']] | 174 reaction_pathways[reaction.id] = [reaction.annotation['pathways']] |
| 175 | 175 |
| 176 # Create column names for compartments | 176 # Create column names for pathways |
| 177 compartment_columns = [f"Compartment_{i+1}" for i in range(max_compartments)] | 177 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] |
| 178 | 178 |
| 179 # Second pass: create the data | 179 # Second pass: create the data |
| 180 for reaction_id, compartments in reaction_compartments.items(): | 180 for reaction_id, pathways in reaction_pathways.items(): |
| 181 row = {"ReactionID": reaction_id} | 181 row = {"ReactionID": reaction_id} |
| 182 | 182 |
| 183 # Fill compartment columns | 183 # Fill pathway columns |
| 184 for i in range(max_compartments): | 184 for i in range(max_pathways): |
| 185 col_name = compartment_columns[i] | 185 col_name = pathway_columns[i] |
| 186 if i < len(compartments): | 186 if i < len(pathways): |
| 187 row[col_name] = compartments[i] | 187 row[col_name] = pathways[i] |
| 188 | |
| 189 else: | 188 else: |
| 190 row[col_name] = None # or "" if you prefer empty strings | 189 row[col_name] = None # or "" if you prefer empty strings |
| 191 | 190 |
| 192 compartment_data.append(row) | 191 pathway_data.append(row) |
| 193 | 192 |
| 194 return pd.DataFrame(compartment_data) | 193 return pd.DataFrame(pathway_data) |
| 195 | 194 |
| 196 | 195 |
| 197 ###############################- FILE SAVING -################################ | 196 ###############################- FILE SAVING -################################ |
| 198 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: | 197 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: |
| 199 """ | 198 """ |
| 299 # generate data | 298 # generate data |
| 300 rules = generate_rules(model, asParsed = False) | 299 rules = generate_rules(model, asParsed = False) |
| 301 reactions = generate_reactions(model, asParsed = False) | 300 reactions = generate_reactions(model, asParsed = False) |
| 302 bounds = generate_bounds(model) | 301 bounds = generate_bounds(model) |
| 303 medium = get_medium(model) | 302 medium = get_medium(model) |
| 304 compartments = generate_compartments(model) | 303 if ARGS.name == "ENGRO2": |
| 304 compartments = generate_compartments(model) | |
| 305 | 305 |
| 306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) | 306 df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) |
| 307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) | 307 df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) |
| 308 | 308 |
| 309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) | 309 df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) |
| 310 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) | 310 df_medium = medium.rename(columns = {"reaction": "ReactionID"}) |
| 311 df_medium["InMedium"] = True # flag per indicare la presenza nel medium | 311 df_medium["InMedium"] = True # flag per indicare la presenza nel medium |
| 312 | 312 |
| 313 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") | 313 merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") |
| 314 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") | 314 merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") |
| 315 merged = merged.merge(compartments, on = "ReactionID", how = "outer") | 315 if ARGS.name == "ENGRO2": |
| 316 merged = merged.merge(compartments, on = "ReactionID", how = "outer") | |
| 316 merged = merged.merge(df_medium, on = "ReactionID", how = "left") | 317 merged = merged.merge(df_medium, on = "ReactionID", how = "left") |
| 317 | 318 |
| 318 merged["InMedium"] = merged["InMedium"].fillna(False) | 319 merged["InMedium"] = merged["InMedium"].fillna(False) |
| 319 | 320 |
| 320 merged = merged.sort_values(by = "InMedium", ascending = False) | 321 merged = merged.sort_values(by = "InMedium", ascending = False) |
