# HG changeset patch
# User francesco_lapi
# Date 1757698125 0
# Node ID a6e45049c1b93a6a1e14df159a38de774c8a5c09
# Parent 4e2bc80764b6a56510970c01560977993e50cb8a
Uploaded
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/README.md
--- a/COBRAxy/README.md Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/README.md Fri Sep 12 17:28:45 2025 +0000
@@ -1,11 +1,289 @@
-# Official repository for the COBRAxy toolset
-> COBRAxy (COBRApy in Galaxy) is a user-friendly tool that allows a user to user to characterize and to graphically compare simulated fluxomics coming from groups of samples with different transcriptional regulation of metabolism.
-It extends the MaREA 2 (Metabolic Reaction Enrichment Analysis) tool that enables users to compare groups in terms of RAS and RPS only. The tool is available as plug-in for the widely-used Galaxy platform for comparative genomics and bioinformatics analyses.
+
+
+
+
+# COBRAxy — Metabolic analysis and visualization toolkit (Galaxy-ready)
+
+COBRAxy (COBRApy in Galaxy) is a toolkit to compute, analyze, and visualize metabolism at the reaction level from transcriptomics and metabolomics data. It enables users to:
+
+- derive Reaction Activity Scores (RAS) from gene expression and Reaction Propensity Scores (RPS) from metabolite abundances,
+- integrate RAS into model bounds,
+- perform flux sampling with either CBS (constraint-based sampling) or OPTGP,
+- compute statistics (pFBA, FVA, sensitivity) and generate styled SVG/PDF metabolic maps,
+- run all tools as Galaxy wrappers or via CLI on any machine.
+
+It extends the MaREA 2 (Metabolic Reaction Enrichment Analysis) concept by adding sampling-based flux comparison and rich visualization. The repository ships both Python CLIs and Galaxy tool XMLs.
+
+## Table of contents
+
+- Overview and features
+- Requirements
+- Installation (pip/conda)
+- Quick start (CLI)
+- Tools and usage
+ - custom_data_generator
+ - ras_generator (RAS)
+ - rps_generator (RPS)
+ - ras_to_bounds
+ - flux_simulation (CBS/OPTGP)
+ - marea (enrichment + maps)
+ - flux_to_map (maps from fluxes)
+ - marea_cluster (clustering auxiliaries)
+- Typical workflow
+- Input/output formats
+- Galaxy usage
+- Troubleshooting
+- Contributing
+- License and citations
+- Useful links
+
+## Overview and features
+
+COBRAxy builds on COBRApy to deliver end‑to‑end analysis from expression/metabolite data to flux statistics and map rendering:
+
+- RAS and RPS computation from tabular inputs
+- Bounds integration and model preparation
+- Flux sampling: CBS (GLPK backend) with automatic fallback to a COBRApy interface, or OPTGP
+- Flux statistics: mean/median/quantiles, pFBA, FVA, sensitivity
+- Map styling/export: SVG with optional PDF/PNG export
+- Ready-made Galaxy wrappers for all tools
+
+Bundled resources in `local/` include example models (ENGRO2, Recon), gene mappings, a default medium, and SVG maps.
+
+## Requirements
+
+- OS: Linux, macOS, or Windows (Linux recommended; Galaxy typically runs on Linux)
+- Python: 3.8.20 ≤ version < 3.12 (as per `setup.py`)
+- Python packages (installed automatically by `pip install .`):
+ - cobra==0.29.0, numpy==1.24.4, pandas==2.0.3, scipy==1.11, scikit-learn==1.3.2, seaborn==0.13.0
+ - matplotlib==3.7.3, lxml==5.2.2, cairosvg==2.7.1, svglib==1.5.1, pyvips==2.2.3, Pillow
+ - joblib==1.4.2, anndata==0.8.0, pydeseq2==0.5.1
+- Optional but recommended for CBS sampling performance:
+ - GLPK solver and Python bindings
+ - System library: glpk (e.g., Ubuntu: `apt-get install glpk-utils libglpk40`)
+ - Python: `swiglpk` (note: CBS falls back to a COBRApy interface if GLPK is unavailable)
+- For pyvips: system libvips (e.g., Ubuntu: `apt-get install libvips`)
+
+Notes:
+- If you hit system-level library errors for SVG/PDF/PNG conversion or vips, install the corresponding OS packages.
+- GPU is not required.
+
+## Installation
+
+Python virtual environment is strongly recommended.
+
+### Install from source (pip)
+
+1) Clone the repo and install:
+
+```bash
+git clone https://github.com/CompBtBs/COBRAxy.git
+cd COBRAxy
+python3 -m venv .venv && source .venv/bin/activate
+pip install --upgrade pip
+pip install .
+```
+
+This installs console entry points: `custom_data_generator`, `ras_generator`, `rps_generator`, `ras_to_bounds`, `flux_simulation`, `flux_to_map`, `marea`, `marea_cluster`.
+
+### Install with conda (alternative)
+
+```bash
+conda create -n cobraxy python=3.10 -y
+conda activate cobraxy
+pip install .
+# Optional system deps (Ubuntu): sudo apt-get install libvips libxml2 libxslt1.1 glpk-utils
+# Optional Python bindings for GLPK: pip install swiglpk
+```
+
+## Quick start (CLI)
+
+All tools provide `-h/--help` for details. Outputs are TSV/CSV and SVG/PDF files depending on the tool and flags.
+
+Example minimal flow (using built-in ENGRO2 model and provided assets):
+
+```bash
+# 1) Generate rules/reactions/bounds/medium from a model (optional if using bundled ones)
+custom_data_generator \
+ -id local/models/ENGRO2.xml \
+ -mn ENGRO2.xml \
+ -orules out/ENGRO2_rules.tsv \
+ -orxns out/ENGRO2_reactions.tsv \
+ -omedium out/ENGRO2_medium.tsv \
+ -obnds out/ENGRO2_bounds.tsv
+
+# 2) Compute RAS from expression data
+ras_generator \
+ -td $(pwd) \
+ -in my_expression.tsv \
+ -ra out/ras.tsv \
+ -rs ENGRO2
+
+# 3) Integrate RAS into bounds
+ras_to_bounds \
+ -td $(pwd) \
+ -ms ENGRO2 \
+ -ir out/ras.tsv \
+ -rs true \
+ -idop out/ras_bounds
+
+# 4) Flux sampling (CBS)
+flux_simulation \
+ -td $(pwd) \
+ -ms ENGRO2 \
+ -in out/ras_bounds/sample1.tsv,out/ras_bounds/sample2.tsv \
+ -ni sample1,sample2 \
+ -a CBS -ns 500 -sd 0 -nb 1 \
+ -ot mean,median,quantiles \
+ -ota pFBA,FVA,sensitivity \
+ -idop out/flux
-## Useful links:
+# 5) Enrichment + map styling (RAS/RPS or fluxes)
+marea \
+ -td $(pwd) \
+ -using_RAS true -input_data out/ras.tsv \
+ -comparison manyvsmany -test ks \
+ -generate_svg true -generate_pdf true \
+ -choice_map ENGRO2 -idop out/maps
+```
+
+## Tools and usage
+
+Below is a high‑level summary of each CLI. Use `--help` for the full list of options.
+
+### 1) custom_data_generator
+
+Generate model‑derived assets.
+
+Required inputs:
+- `-id/--input`: model file (XML or JSON; gz/zip/bz2 also supported via extension)
+- `-mn/--name`: the original file name including extension (Galaxy renames files; this preserves the true format)
+- `-orules`, `-orxns`, `-omedium`, `-obnds`: output paths
+
+Outputs:
+- TSV with rules, reactions, exchange medium, and bounds.
+
+### 2) ras_generator (Reaction Activity Scores)
+
+Compute RAS from a gene expression table.
+
+Key inputs:
+- `-td/--tool_dir`: repository root path (used to locate `local/` assets)
+- `-in/--input`: expression TSV (rows: genes; columns: samples)
+- `-rs/--rules_selector`: model/rules choice, e.g. `ENGRO2` or `Custom` with `-rl` and `-rn`
+- Optional: `-rl/--rule_list` custom rules TSV, `-rn/--rules_name` its original name/extension
+- Output: `-ra/--ras_output` TSV
+
+### 3) rps_generator (Reaction Propensity Scores)
+
+Compute RPS from a metabolite abundance table.
+
+Key inputs:
+- `-td/--tool_dir`: repository root
+- `-id/--input`: metabolite TSV (rows: metabolites; columns: samples)
+- `-rc/--reaction_choice`: `default` or `custom` with `-cm/--custom` reactions TSV
+- Output: `-rp/--rps_output` TSV
+
+### 4) ras_to_bounds
+
+Integrate RAS into reaction bounds for a given model and medium.
+
+Key inputs:
+- `-td/--tool_dir`: repository root
+- `-ms/--model_selector`: one of `ENGRO2` or `Custom` with `-mo/--model` and `-mn/--model_name`
+- Medium: `-mes/--medium_selector` (default `allOpen`) or `-meo/--medium` custom TSV
+- RAS: `-ir/--input_ras` and `-rs/--ras_selector` (true/false)
+- Output folder: `-idop/--output_path`
+
+Outputs:
+- One bounds TSV per sample in the RAS table.
+
+### 5) flux_simulation
+
+Flux sampling with CBS or OPTGP and downstream statistics.
+
+Key inputs:
+- `-td/--tool_dir`
+- Model: `-ms/--model_selector` (ENGRO2 or Custom with `-mo`/`-mn`)
+- Bounds files: `-in` (comma‑separated list) and `-ni/--names` (comma‑separated sample names)
+- Algorithm: `-a CBS|OPTGP`; CBS uses GLPK if available and falls back to a COBRApy interface
+- Sampling params: `-ns/--n_samples`, `-th/--thinning` (OPTGP), `-nb/--n_batches`, `-sd/--seed`
+- Outputs: `-ot/--output_type` (mean,median,quantiles) and `-ota/--output_type_analysis` (pFBA,FVA,sensitivity)
+- Output path: `-idop/--output_path`
+
+Outputs:
+- Per‑sample or aggregated CSV/TSV with flux samples and statistics.
+
+### 6) marea
+
+Statistical enrichment and map styling for RAS and/or RPS groups with optional DESeq2‑style testing via `pydeseq2`.
+
+Key inputs:
+- `-td/--tool_dir`
+- Comparison: `-co manyvsmany|onevsrest|onevsmany`
+- Test: `-te ks|ttest_p|ttest_ind|wilcoxon|mw|DESeq`
+- Thresholds: `-pv`, `-adj` (FDR), `-fc`
+- Data: RAS `-using_RAS` plus `-input_data` or multiple datasets with names; similarly for RPS with `-using_RPS`
+- Map: `-choice_map HMRcore|ENGRO2|Custom` or `-custom_map` SVG
+- Output: `-gs/--generate_svg`, `-gp/--generate_pdf`, output dir `-idop`
+
+Outputs:
+- Styled SVG (and optional PDF/PNG) highlighting enriched reactions by color/width per your thresholds.
+
+### 7) flux_to_map
+
+Like `marea`, but driven by fluxes instead of RAS/RPS. Accepts single or multiple flux datasets and produces styled maps.
+
+### 8) marea_cluster
+
+Convenience clustering utilities (k‑means, DBSCAN, hierarchical) for grouping samples; produces labels and optional plots.
+
+## Typical workflow
+
+1. Prepare a model and generate its assets (optional if using bundled assets): `custom_data_generator`
+2. Compute RAS from expression: `ras_generator` (and/or compute RPS via `rps_generator`)
+3. Integrate RAS into bounds: `ras_to_bounds`
+4. Sample fluxes: `flux_simulation` with CBS or OPTGP
+5. Analyze and visualize: `marea` or `flux_to_map` to render SVG/PDF metabolic maps
+6. Optionally cluster or further analyze results: `marea_cluster`
+
+## Input/output formats
+
+Unless otherwise stated, inputs are tab‑separated (TSV) text files with headers.
+
+- Expression (RAS): rows = genes (HGNC/Ensembl/symbol/Entrez supported), columns = samples
+- Metabolite table (RPS): rows = metabolites, columns = samples
+- Rules/Reactions: TSV with two columns: ReactionID, Rule/Reaction
+- Bounds: TSV with index = reaction IDs, columns = lower_bound, upper_bound
+- Medium: single‑column TSV listing exchange reactions
+- Flux samples/statistics: CSV/TSV with reactions as rows and samples/statistics as columns
+
+## Galaxy usage
+
+Each CLI has a corresponding Galaxy tool XML in the repository (e.g., `marea.xml`, `flux_simulation.xml`). Use `shed.yml` to publish to a Galaxy toolshed. The `local/` directory provides models, mappings, and maps for out‑of‑the‑box runs inside Galaxy.
+
+## Troubleshooting
+
+- GLPK/CBS issues: if `swiglpk` or GLPK is missing, `flux_simulation` will attempt a COBRApy fallback. Install GLPK + `swiglpk` for best performance.
+- pyvips errors: install `libvips` on your system. Reinstall the `pyvips` wheel afterward if needed.
+- PDF/SVG conversions: ensure `cairosvg`, `svglib`, and system libraries (`libxml2`, `libxslt`) are installed.
+- Python version: stick to Python ≥3.8.20 and <3.12.
+- Memory/time: reduce `-ns` (samples) or `-nb` (batches); consider OPTGP if CBS is slow for your model.
+
+## Contributing
+
+Pull requests are welcome. Please:
+- keep changes focused and documented,
+- add concise docstrings/comments in English,
+- preserve public CLI parameters and file formats.
+
+## License and citations
+
+This project is distributed under the MIT License. If you use COBRAxy in academic work, please cite COBRApy and MaREA, and reference this repository.
+
+## Useful links
+
- COBRAxy Google Summer of Code 2024: https://summerofcode.withgoogle.com/programs/2024/projects/LSrCKfq7
- COBRApy: https://opencobra.github.io/cobrapy/
- MaREA4Galaxy: https://galaxyproject.org/use/marea4galaxy/
- Galaxy project: https://usegalaxy.org/
-
-## Documentation:
\ No newline at end of file
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/custom_data_generator_beta.py
--- a/COBRAxy/custom_data_generator_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/custom_data_generator_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,13 +1,19 @@
+"""
+Custom data generator for COBRA models.
+
+This script loads a COBRA model (built-in or custom), optionally applies
+medium and gene nomenclature settings, derives reaction-related metadata
+(GPR rules, formulas, bounds, objective coefficients, medium membership,
+and compartments for ENGRO2), and writes a tabular summary.
+"""
+
import os
import csv
import cobra
-import pickle
import argparse
import pandas as pd
import utils.general_utils as utils
-import utils.rule_parsing as rulesUtils
-from typing import Optional, Tuple, Union, List, Dict
-import utils.reaction_parsing as reactionUtils
+from typing import Optional, Tuple, List
import utils.model_utils as modelUtils
import logging
@@ -50,7 +56,7 @@
################################- INPUT DATA LOADING -################################
def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
"""
- Loads a custom model from a file, either in JSON or XML format.
+ Loads a custom model from a file, either in JSON, XML, MAT, or YML format.
Args:
file_path : The path to the file containing the custom model.
@@ -70,9 +76,17 @@
if ext is utils.FileFormat.JSON:
return cobra.io.load_json_model(file_path.show())
+ if ext is utils.FileFormat.MAT:
+ return cobra.io.load_matlab_model(file_path.show())
+
+ if ext is utils.FileFormat.YML:
+ return cobra.io.load_yaml_model(file_path.show())
+
except Exception as e: raise utils.DataErr(file_path, e.__str__())
- raise utils.DataErr(file_path,
- f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML")
+ raise utils.DataErr(
+ file_path,
+ f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported."
+ )
###############################- FILE SAVING -################################
@@ -115,6 +129,19 @@
writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
+ """
+ Save a pandas DataFrame as a tab-separated file, creating directories as needed.
+
+ Args:
+ df: The DataFrame to write.
+ path: Destination file path (will be written as TSV).
+
+ Raises:
+ DataErr: If writing the output fails for any reason.
+
+ Returns:
+ None
+ """
try:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
df.to_csv(path, sep="\t", index=False)
@@ -125,22 +152,22 @@
###############################- ENTRY POINT -################################
def main(args:List[str] = None) -> None:
"""
- Initializes everything and sets the program in motion based on the fronted input arguments.
+ Initialize and generate custom data based on the frontend input arguments.
Returns:
None
"""
- # get args from frontend (related xml)
+ # Parse args from frontend (Galaxy XML)
global ARGS
ARGS = process_args(args)
if ARGS.input:
- # load custom model
+ # Load a custom model from file
model = load_custom_model(
utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext)
else:
- # load built-in model
+ # Load a built-in model
try:
model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2']
@@ -164,28 +191,15 @@
medium = df_mediums[[ARGS.medium_selector]]
medium = medium[ARGS.medium_selector].to_dict()
- # Set all reactions to zero in the medium
+ # Reset all medium reactions lower bound to zero
for rxn_id, _ in model.medium.items():
model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
- # Set medium conditions
+ # Apply selected medium uptake bounds (negative for uptake)
for reaction, value in medium.items():
if value is not None:
model.reactions.get_by_id(reaction).lower_bound = -float(value)
- #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default":
- # logging.basicConfig(level=logging.INFO)
- # logger = logging.getLogger(__name__)
-
- #model = modelUtils.translate_model_genes(
- # model=model,
- # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str},
- # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "),
- # source_nomenclature='HGNC_ID',
- # logger=logger
- #)
- #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC "))
-
if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default":
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -213,7 +227,7 @@
df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
df_medium = medium.rename(columns = {"reaction": "ReactionID"})
- df_medium["InMedium"] = True # flag per indicare la presenza nel medium
+ df_medium["InMedium"] = True
merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
@@ -226,12 +240,6 @@
merged = merged.sort_values(by = "InMedium", ascending = False)
- #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data")
-
- #merged.to_csv(out_file, sep = '\t', index = False)
-
- ####
-
if not ARGS.out_tabular:
raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
save_as_tabular_df(merged, ARGS.out_tabular)
@@ -239,7 +247,7 @@
# verify output exists and non-empty
if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
- raise utils.DataErr(expected, "Output non creato o vuoto")
+ raise utils.DataErr(expected, "Output not created or empty")
print("CustomDataGenerator: completed successfully")
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/flux_simulation_beta.py
--- a/COBRAxy/flux_simulation_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/flux_simulation_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,8 +1,19 @@
+"""
+Flux sampling and analysis utilities for COBRA models.
+
+This script supports two modes:
+- Mode 1 (model_and_bounds=True): load a base model and apply bounds from
+ separate files before sampling.
+- Mode 2 (model_and_bounds=False): load complete models and sample directly.
+
+Sampling algorithms supported: OPTGP and CBS. Outputs include flux samples
+and optional analyses (pFBA, FVA, sensitivity), saved as tabular files.
+"""
+
import argparse
import utils.general_utils as utils
-from typing import Optional, List
+from typing import List
import os
-import numpy as np
import pandas as pd
import cobra
import utils.CBS_backend as CBS_backend
@@ -121,6 +132,17 @@
def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None:
+ """
+ Write a DataFrame to a TSV file under ARGS.output_path with a given base name.
+
+ Args:
+ dataset: The DataFrame to write.
+ name: Base file name (without extension).
+ keep_index: Whether to keep the DataFrame index in the file.
+
+ Returns:
+ None
+ """
dataset.index.name = 'Reactions'
dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index)
@@ -180,7 +202,6 @@
for i in range(0, n_batches):
os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv')
- pass
def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:
@@ -224,7 +245,6 @@
for i in range(0, n_batches):
os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv')
- pass
@@ -393,7 +413,7 @@
############################# main ###########################################
def main(args :List[str] = None) -> None:
"""
- Initializes everything and sets the program in motion based on the fronted input arguments.
+ Initialize and run sampling/analysis based on the frontend input arguments.
Returns:
None
@@ -407,11 +427,6 @@
if not os.path.exists(ARGS.output_path):
os.makedirs(ARGS.output_path)
- #ARGS.bounds = ARGS.input.split(",")
- #ARGS.bounds_name = ARGS.name.split(",")
- #ARGS.output_types = ARGS.output_type.split(",")
- #ARGS.output_type_analysis = ARGS.output_type_analysis.split(",")
-
# --- Normalize inputs (the tool may pass comma-separated --input and either --name or --names) ---
ARGS.input_files = ARGS.input.split(",") if ARGS.input else []
ARGS.file_names = ARGS.name.split(",")
@@ -439,14 +454,14 @@
validation = model_utils.validate_model(base_model)
- print("\n=== VALIDAZIONE MODELLO ===")
+ print("\n=== MODEL VALIDATION ===")
for key, value in validation.items():
print(f"{key}: {value}")
- #Set solver verbosity to 1 to see warning and error messages only.
+ # Set solver verbosity to 1 to see warning and error messages only.
base_model.solver.configuration.verbosity = 1
- # Process each bounds file with the base model
+ # Process each bounds file with the base model
results = Parallel(n_jobs=num_processors)(
delayed(model_sampler_with_bounds)(base_model, bounds_file, cell_name)
for bounds_file, cell_name in zip(ARGS.input_files, ARGS.file_names)
@@ -498,7 +513,7 @@
all_sensitivity = all_sensitivity.sort_index()
write_to_file(all_sensitivity.T, "sensitivity", True)
- pass
+ return
##############################################################################
if __name__ == "__main__":
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/fromCSVtoCOBRA_beta.py
--- a/COBRAxy/fromCSVtoCOBRA_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/fromCSVtoCOBRA_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,18 +1,25 @@
+"""
+Convert a tabular (CSV/TSV/Tabular) description of a COBRA model into a COBRA file.
+
+Supported output formats: SBML, JSON, MATLAB (.mat), YAML.
+The script logs to a user-provided file for easier debugging in Galaxy.
+"""
+
import os
-import csv
import cobra
-import pickle
import argparse
-import pandas as pd
-import utils.general_utils as utils
-from typing import Optional, Tuple, Union, List, Dict
+from typing import List
import logging
-import utils.rule_parsing as rulesUtils
-import utils.reaction_parsing as reactionUtils
import utils.model_utils as modelUtils
ARGS : argparse.Namespace
def process_args(args: List[str] = None) -> argparse.Namespace:
+ """
+ Parse command-line arguments for the CSV-to-COBRA conversion tool.
+
+ Returns:
+ argparse.Namespace: Parsed arguments.
+ """
parser = argparse.ArgumentParser(
usage="%(prog)s [options]",
description="Convert a tabular/CSV file to a COBRA model"
@@ -45,6 +52,13 @@
###############################- ENTRY POINT -################################
def main(args: List[str] = None) -> None:
+ """
+ Entry point: parse arguments, build the COBRA model from a CSV/TSV file,
+ and save it in the requested format.
+
+ Returns:
+ None
+ """
global ARGS
ARGS = process_args(args)
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/local/readme.txt
--- a/COBRAxy/local/readme.txt Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/local/readme.txt Fri Sep 12 17:28:45 2025 +0000
@@ -4,6 +4,4 @@
{keys = possibili codifiche dei geni : value = { keys = nome dei geni nella codifica corrispondente : 'ok' } }
Regole:
-{keys = possibili codifiche dei geni : value = { keys = nome della reazione/metabolita : [ lista di stringhe contenente la regola nella codifica corrispondente alla keys ] } }
-
-README DA RIVEDERE
\ No newline at end of file
+{keys = possibili codifiche dei geni : value = { keys = nome della reazione/metabolita : [ lista di stringhe contenente la regola nella codifica corrispondente alla keys ] } }
\ No newline at end of file
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/marea.py
--- a/COBRAxy/marea.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/marea.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,3 +1,10 @@
+"""
+MAREA: Enrichment and map styling for RAS/RPS data.
+
+This module compares groups of samples using RAS (Reaction Activity Scores) and/or
+RPS (Reaction Propensity Scores), computes statistics (p-values, z-scores, fold change),
+and applies visual styling to an SVG metabolic map (with optional PDF/PNG export).
+"""
from __future__ import division
import csv
from enum import Enum
@@ -26,13 +33,13 @@
ARGS :argparse.Namespace
def process_args(args:List[str] = None) -> argparse.Namespace:
"""
- Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
+ Parse command-line arguments exposed by the Galaxy frontend for this module.
Args:
- args : Always obtained (in file) from sys.argv
+ args: Optional list of arguments, defaults to sys.argv when None.
Returns:
- Namespace : An object containing the parsed arguments
+ Namespace: Parsed arguments.
"""
parser = argparse.ArgumentParser(
usage = "%(prog)s [options]",
@@ -265,8 +272,6 @@
tmp.append('stroke-dasharray:' + dash)
return ';'.join(tmp)
-# TODO: remove, there's applyRPS whatever
-# The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
"""
Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
@@ -343,18 +348,15 @@
f"//*[@id=\"{reactionId}\"]").map(
lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
- # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
def styleMapElement(element :ET.Element, styleStr :str) -> None:
+ """Append/override stroke-related styles on a given SVG element."""
currentStyles :str = element.get("style", "")
if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
- currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure?
-
- #TODO: this is attempting to solve the styling override problem, not sure it does tho
+ currentStyles = ';'.join(currentStyles.split(';')[:-3])
element.set("style", currentStyles + styleStr)
-# TODO: maybe remove vvv
class ReactionDirection(Enum):
Unknown = ""
Direct = "_F"
@@ -372,6 +374,7 @@
return ReactionDirection.fromDir(reactionId[-2:])
def getArrowBodyElementId(reactionId :str) -> str:
+ """Return the SVG element id for a reaction arrow body, normalizing direction tags."""
if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
return f"R_{reactionId}"
@@ -401,13 +404,9 @@
UpRegulated = "#ecac68" # orange, up-regulated reaction
DownRegulated = "#6495ed" # lightblue, down-regulated reaction
- UpRegulatedInv = "#FF0000"
- # ^^^ bright red, up-regulated net value for a reversible reaction with
- # conflicting enrichment in the two directions.
+ UpRegulatedInv = "#FF0000" # bright red for reversible with conflicting directions
- DownRegulatedInv = "#0000FF"
- # ^^^ bright blue, down-regulated net value for a reversible reaction with
- # conflicting enrichment in the two directions.
+ DownRegulatedInv = "#0000FF" # bright blue for reversible with conflicting directions
@classmethod
def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
@@ -444,7 +443,7 @@
ERRORS.append(reactionId)
def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
- # If We're dealing with RAS data or in general don't care about the direction of the reaction we only style the arrow body
+ # If direction is irrelevant (e.g., RAS), style only the arrow body
if not mindReactionDir:
return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
@@ -453,24 +452,6 @@
self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
- # TODO: this seems to be unused, remove
- def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
- """
- Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
-
- Args:
- reactionId: the reaction ID, as encoded in the dataset.
- mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
-
- Returns:
- str : the ID of an arrow's body or tips in the map.
- """
- # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
- if not mindReactionDir: return "R_" + reactionId
-
- #TODO: this is clearly something we need to make consistent in RPS
- return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
-
def toStyleStr(self, *, downSizedForTips = False) -> str:
"""
Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
@@ -482,13 +463,11 @@
if downSizedForTips: width *= 0.8
return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
-# vvv These constants could be inside the class itself a static properties, but python
-# was built by brainless organisms so here we are!
+# Default arrows used for different significance states
INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
TRANSPARENT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Transparent) # Who cares how big it is if it's transparent
-# TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever
def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
"""
Applies RPS enrichment results to the provided metabolic map.
@@ -581,7 +560,7 @@
if l:
class_pat[classe] = {
- "values": list(map(list, zip(*l))), # trasposta
+ "values": list(map(list, zip(*l))), # transpose
"samples": sample_ids
}
continue
@@ -592,7 +571,7 @@
return class_pat
############################ conversion ##############################################
-#conversion from svg to png
+# Conversion from SVG to PNG
def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
"""
Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
@@ -660,14 +639,8 @@
Returns:
utils.FilePath : _description_
"""
- # This function returns a util data structure but is extremely specific to this module.
- # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
- # TODO: until a third tool with multiple outputs appears before porting this to utils.
return utils.FilePath(
f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
- # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
- # all output files: I don't care, this was never the performance bottleneck of the tool and
- # there is no other net gain in saving and re-using the built string.
ext,
prefix = ARGS.output_path)
@@ -683,10 +656,8 @@
if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
writer.writerow({ field : data for field, data in zip(fieldNames, row) })
-OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
+OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]]
def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
- # this function compiles the things always in common between comparison modes after enrichment.
- # TODO: organize, name better.
suffix = "RAS" if ras_enrichment else "RPS"
writeToCsv(
[ [reactId] + values for reactId, values in tmp.items() ],
@@ -809,7 +780,6 @@
# TODO: the net RPS computation should be done in the RPS module
def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float, Dict[str, Tuple[np.ndarray, np.ndarray]]]:
- #TODO: the following code still suffers from "dumbvarnames-osis"
netRPS :Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {}
count = 0
@@ -818,7 +788,7 @@
for l1, l2 in zip(dataset1Data, dataset2Data):
reactId = ids[count]
count += 1
- if not reactId: continue # we skip ids that have already been processed
+ if not reactId: continue
try: #TODO: identify the source of these errors and minimize code in the try block
reactDir = ReactionDirection.fromReactionId(reactId)
@@ -947,13 +917,11 @@
except Exception as e:
raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}')
- if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not
+ if not ARGS.generate_svg:
os.remove(svgFilePath.show())
ClassPat = Dict[str, List[List[float]]]
def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat, Dict[str, List[str]]]:
- # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
- # for the sake of everyone's sanity.
columnNames :Dict[str, List[str]] = {} # { datasetName : [ columnName, ... ], ... }
class_pat :ClassPat = {}
if ARGS.option == 'datasets':
@@ -983,9 +951,7 @@
columnNames[clas] = ["Reactions", *values_and_samples_id["samples"]]
return ids, class_pat, columnNames
- #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
-#TODO: create these damn args as FilePath objects
def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
"""
Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
@@ -1025,18 +991,16 @@
ARGS.tool_dir,
utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
- # TODO: in the future keep the indices WITH the data and fix the code below.
-
# Prepare enrichment results containers
ras_results = []
rps_results = []
# Compute RAS enrichment if requested
- if ARGS.using_RAS: # vvv columnNames only matter with RPS data
+ if ARGS.using_RAS:
ids_ras, class_pat_ras, _ = getClassesAndIdsFromDatasets(
ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
ras_results, _ = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True)
- # ^^^ netRPS only matter with RPS data
+
# Compute RPS enrichment if requested
if ARGS.using_RPS:
@@ -1065,7 +1029,7 @@
# Apply RPS styling to arrow heads
if res.get('rps'):
tmp_rps, max_z_rps = res['rps']
- # applyRpsEnrichmentToMap styles only heads unless only RPS are used
+
temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False)
# Output both SVG and PDF/PNG as configured
@@ -1076,7 +1040,6 @@
for datasetName, rows in netRPS.items():
writeToCsv(
[[reactId, *netValues] for reactId, netValues in rows.items()],
- # vvv In weird comparison modes the dataset names are not recorded properly..
columnNames.get(datasetName, ["Reactions"]),
utils.FilePath(
"Net_RPS_" + datasetName,
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/ras_generator_beta.py
--- a/COBRAxy/ras_generator_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/ras_generator_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,5 +1,10 @@
+"""
+Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
+
+The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
+computes RAS per reaction for each sample/cell line, and writes a tabular output.
+"""
from __future__ import division
-# galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
import sys
import argparse
import collections
@@ -8,7 +13,6 @@
import utils.general_utils as utils
import utils.rule_parsing as ruleUtils
from typing import Union, Optional, List, Dict, Tuple, TypeVar
-import os
ERRORS = []
########################## argparse ##########################################
@@ -31,7 +35,7 @@
help = "path to input file containing the rules")
parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
- # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
+ # Galaxy converts files into .dat, this helps infer the original extension when needed.
parser.add_argument(
'-n', '--none',
@@ -49,7 +53,7 @@
help = "Output log")
parser.add_argument(
- '-in', '--input', #id è diventato in
+ '-in', '--input',
type = str,
help = 'input dataset')
@@ -253,14 +257,14 @@
############################ resolve ##########################################
def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
"""
- Replace gene identifiers with corresponding values from a dictionary.
+ Replace gene identifiers in a parsed rule expression with values from a dict.
Args:
- l (str): String of gene identifier.
- d (str): String corresponding to its value.
+ l: Parsed rule as a nested list structure (strings, lists, and operators).
+ d: Dict mapping gene IDs to numeric values.
Returns:
- tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
+ tuple: (new_expression, not_found_genes)
"""
tmp = []
err = []
@@ -277,16 +281,16 @@
l = l[1:]
return (tmp, err)
-def replace_gene(l :str, d :str) -> Union[int, float]:
+def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]:
"""
Replace a single gene identifier with its corresponding value from a dictionary.
Args:
l (str): Gene identifier to replace.
- d (str): String corresponding to its value.
+ d (dict): Dict mapping gene IDs to numeric values.
Returns:
- float/int: Corresponding value from the dictionary if found, None otherwise.
+ float/int/None: Corresponding value from the dictionary if found, None otherwise.
Raises:
sys.exit: If the value associated with the gene identifier is not valid.
@@ -508,9 +512,9 @@
Args:
dataset (pd.DataFrame): Dataset containing gene values.
rules (dict): The dict containing reaction ids as keys and rules as values.
-
- Side effects:
- dataset : mut
+
+ Note:
+ Modifies dataset in place by setting the first column as index.
Returns:
dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
@@ -590,11 +594,11 @@
def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
"""
- Save computed ras scores to the given path, as a tsv file.
+ Save computed RAS scores to ARGS.ras_output as a TSV file.
Args:
rasScores : the computed ras scores.
- path : the output tsv file's path.
+ reactions : the list of reaction IDs, used as the first column.
Returns:
None
@@ -627,7 +631,7 @@
"""
supportedGenesInEncoding = geneTranslator[encoding]
if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
- raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
+ raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.")
def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
"""
@@ -637,14 +641,7 @@
Returns:
Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
"""
- datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in galaxy as a .dat
-
- #try: filenamePath = utils.FilePath.fromStrPath(ARGS.model_upload_name) # file's name in input, to determine its original ext
- #except utils.PathErr as err:
- # utils.logWarning(f"Cannot determine file extension from filename '{ARGS.model_upload_name}'. Assuming tabular format.", ARGS.out_log)
- # filenamePath = None
-
- #if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
+ datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in Galaxy as a .dat
dict_rule = {}
@@ -658,7 +655,7 @@
id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
- # Proviamo prima con delimitatore tab
+ # First, try using a tab delimiter
for line in rows[1:]:
if len(line) <= idx_gpr:
utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
@@ -670,7 +667,7 @@
dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
except Exception as e:
- # Se fallisce con tab, proviamo con virgola
+ # If parsing with tabs fails, try comma delimiter
try:
rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
@@ -682,7 +679,7 @@
id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
- # Proviamo prima con delimitatore tab
+ # Try again parsing row content with the GPR column using comma-separated values
for line in rows[1:]:
if len(line) <= idx_gpr:
utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
@@ -729,39 +726,7 @@
ARGS.out_log)
- ############
-
- # handle custom models
- #model :utils.Model = ARGS.rules_selector
-
- #if model is utils.Model.Custom:
- # rules = load_custom_rules()
- # reactions = list(rules.keys())
-
- # save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
- # if ERRORS: utils.logWarning(
- # f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
- # ARGS.out_log)
-
- # return
-
- # This is the standard flow of the ras_generator program, for non-custom models.
- #name = "RAS Dataset"
- #type_gene = gene_type(dataset.iloc[0, 0], name)
-
- #rules = model.getRules(ARGS.tool_dir)
- #genes = data_gene(dataset, type_gene, name, None)
- #ids, rules = load_id_rules(rules.get(type_gene))
-
- #resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
- #create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
-
- #if err: utils.logWarning(
- # f"Warning: gene(s) {err} not found in class \"{name}\", " +
- # "the expression level for this gene will be considered NaN",
- # ARGS.out_log)
-
- print("Execution succeded")
+ print("Execution succeeded")
###############################################################################
if __name__ == "__main__":
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/ras_to_bounds_beta.py
--- a/COBRAxy/ras_to_bounds_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/ras_to_bounds_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,3 +1,13 @@
+"""
+Apply RAS-based scaling to reaction bounds and optionally save updated models.
+
+Workflow:
+- Read one or more RAS matrices (patients/samples x reactions)
+- Normalize and merge them, optionally adding class suffixes to sample IDs
+- Build a COBRA model from a tabular CSV
+- Run FVA to initialize bounds, then scale per-sample based on RAS values
+- Save bounds per sample and optionally export updated models in chosen formats
+"""
import argparse
import utils.general_utils as utils
from typing import Optional, Dict, Set, List, Tuple, Union
@@ -5,13 +15,9 @@
import numpy as np
import pandas as pd
import cobra
-from cobra import Model, Reaction, Metabolite
-import re
+from cobra import Model
import sys
-import csv
from joblib import Parallel, delayed, cpu_count
-import utils.rule_parsing as rulesUtils
-import utils.reaction_parsing as reactionUtils
import utils.model_utils as modelUtils
################################# process args ###############################
@@ -181,7 +187,7 @@
df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"])
df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"})
df_medium = medium.rename(columns = {"reaction": "ReactionID"})
- df_medium["InMedium"] = True # flag per indicare la presenza nel medium
+ df_medium["InMedium"] = True
merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer")
merged = merged.merge(df_bounds, on = "ReactionID", how = "outer")
@@ -264,7 +270,7 @@
modified_model = apply_bounds_to_model(model, new_bounds)
save_model(modified_model, cellName, save_models_path, save_models_format)
- pass
+ return
def generate_bounds_model(model: cobra.Model, ras=None, output_folder='output/', save_models=False, save_models_path='saved_models/', save_models_format='csv') -> pd.DataFrame:
"""
@@ -298,12 +304,12 @@
) for cellName, ras_row in ras.iterrows())
else:
raise ValueError("RAS DataFrame is None. Cannot generate bounds without RAS data.")
- pass
+ return
############################# main ###########################################
def main(args:List[str] = None) -> None:
"""
- Initializes everything and sets the program in motion based on the fronted input arguments.
+ Initialize and execute RAS-to-bounds pipeline based on the frontend input arguments.
Returns:
None
@@ -321,7 +327,7 @@
error_message = "Duplicated file names in the uploaded RAS matrices."
warning(error_message)
raise ValueError(error_message)
- pass
+
ras_class_names = []
for file in ras_file_names:
ras_class_names.append(file.rsplit(".", 1)[0])
@@ -334,7 +340,7 @@
ras = ras.T
ras = ras.astype(float)
if(len(ras_file_list)>1):
- #append class name to patient id (dataframe index)
+ # Append class name to patient id (DataFrame index)
ras.index = [f"{idx}_{ras_class_name}" for idx in ras.index]
else:
ras.index = [f"{idx}" for idx in ras.index]
@@ -343,9 +349,9 @@
class_assignments.loc[class_assignments.shape[0]] = [patient_id, ras_class_name]
- # Concatenate all ras DataFrames into a single DataFrame
+ # Concatenate all RAS DataFrames into a single DataFrame
ras_combined = pd.concat(ras_list, axis=0)
- # Normalize the RAS values by max RAS
+ # Normalize RAS values column-wise by max RAS
ras_combined = ras_combined.div(ras_combined.max(axis=0))
ras_combined.dropna(axis=1, how='all', inplace=True)
@@ -353,7 +359,7 @@
validation = modelUtils.validate_model(model)
- print("\n=== VALIDAZIONE MODELLO ===")
+ print("\n=== MODEL VALIDATION ===")
for key, value in validation.items():
print(f"{key}: {value}")
@@ -364,7 +370,7 @@
class_assignments.to_csv(ARGS.cell_class, sep='\t', index=False)
- pass
+ return
##############################################################################
if __name__ == "__main__":
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/rps_generator_beta.py
--- a/COBRAxy/rps_generator_beta.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/rps_generator_beta.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,3 +1,10 @@
+"""
+Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry.
+
+Given a tabular dataset (metabolites x samples) and a reaction set, this script
+maps metabolite names via synonyms, fills missing metabolites, and computes RPS
+per reaction for each sample using a log-normalized formula.
+"""
import math
import argparse
@@ -22,14 +29,14 @@
Returns:
Namespace: An object containing parsed arguments.
"""
- parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
- description = 'process some value\'s'+
- ' abundances and reactions to create RPS scores.')
+ parser = argparse.ArgumentParser(
+ usage='%(prog)s [options]',
+ description='Process abundances and reactions to create RPS scores.'
+ )
parser.add_argument("-rl", "--model_upload", type = str,
help = "path to input file containing the reactions")
- # model_upload custom
parser.add_argument('-td', '--tool_dir',
type = str,
required = True,
@@ -54,8 +61,8 @@
Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
Args:
- name_data : name associated with the dataset (from frontend input params)
- count : counter from 1 to make these names unique (external)
+ name_data: Name associated with the dataset (from frontend input params).
+ count: Counter starting at 1 to make names unique when default.
Returns:
str : the name made unique
@@ -81,7 +88,7 @@
Returns None if the cell index is invalid.
"""
if cell_line_index < 0 or cell_line_index >= len(dataset.index):
- print(f"Errore: This cell line index: '{cell_line_index}' is not valid.")
+ print(f"Error: cell line index '{cell_line_index}' is not valid.")
return None
cell_line_name = dataset.columns[cell_line_index]
@@ -138,7 +145,7 @@
list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1.
Side effects:
- dataset_by_rows : mut
+ dataset_by_rows: mutated to include missing metabolites with default abundances.
"""
missing_list = []
for reaction in reactions.values():
@@ -203,8 +210,8 @@
abundances_dict = {}
for row in dataset[1:]:
- id = get_metabolite_id(row[0], syn_dict) #if translationIsApplied else row[0]
- if id:
+ id = get_metabolite_id(row[0], syn_dict)
+ if id:
abundances_dict[id] = list(map(utils.Float(), row[1:]))
missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines)))
@@ -217,7 +224,8 @@
df = pd.DataFrame.from_dict(rps_scores)
df = df.loc[list(reactions.keys()),:]
- print(df.head(10))
+ # Optional preview: first 10 rows
+ # print(df.head(10))
df.index.name = 'Reactions'
df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True)
@@ -232,20 +240,17 @@
global ARGS
ARGS = process_args(args)
- # TODO:use utils functions vvv
+ # Load support data (black list and synonyms)
with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl:
black_list = pk.load(bl)
with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd:
syn_dict = pk.load(sd)
- dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False)
+ dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False)
tmp_dict = None
- #if ARGS.reaction_choice == 'default':
- # reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb'))
- # substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb'))
-
- #elif ARGS.reaction_choice == 'custom':
+
+ # Parse custom reactions from uploaded file
reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload)
for r, s in reactions.items():
tmp_list = list(s.keys())
@@ -258,20 +263,21 @@
if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0
substrateFreqTable[substrateName] += 1
- print(f"Reactions: {reactions}")
- print(f"Substrate Frequencies: {substrateFreqTable}")
- print(f"Synonyms: {syn_dict}")
+ # Debug prints (can be enabled during troubleshooting)
+ # print(f"Reactions: {reactions}")
+ # print(f"Substrate Frequencies: {substrateFreqTable}")
+ # print(f"Synonyms: {syn_dict}")
tmp_dict = {}
for metabName, freq in substrateFreqTable.items():
tmp_metabName = clean_metabolite_name(metabName)
for syn_key, syn_list in syn_dict.items():
if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key):
- print(f"Mapping {tmp_metabName} to {syn_key}")
+ # print(f"Mapping {tmp_metabName} to {syn_key}")
tmp_dict[syn_key] = syn_list
tmp_dict[syn_key].append(tmp_metabName)
rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable)
- print('Execution succeded')
+ print('Execution succeeded')
##############################################################################
if __name__ == "__main__": main()
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/utils/CBS_backend.py
--- a/COBRAxy/utils/CBS_backend.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/CBS_backend.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,3 +1,10 @@
+"""
+CBS backend utilities using GLPK for constraint-based sampling.
+
+This module builds and solves LP problems from COBRA models, supports random
+objective function generation (CBS), and provides both swiglpk-based and
+COBRApy-based sampling fallbacks.
+"""
from swiglpk import *
import random
import pandas as pd
@@ -6,6 +13,20 @@
# Initialize LP problem
def initialize_lp_problem(S):
+ """
+ Prepare sparse matrix structures for GLPK given a stoichiometric matrix.
+
+ Args:
+ S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix.
+
+ Returns:
+ tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol)
+ - len_vector: number of non-zero entries
+ - values: list of non-zero values
+ - indexes: list of (row, col) indices for non-zero entries
+ - ia, ja, ar: GLPK-ready arrays for the sparse matrix
+ - nrows, ncol: matrix shape
+ """
len_vector=len(S.keys())
values=list(S.values())
@@ -29,9 +50,22 @@
-# Solve LP problem from the structure of the metabolic model
def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar,
obj_coefs,reactions,return_lp=False):
+ """
+ Create and solve a GLPK LP problem for a metabolic model.
+
+ Args:
+ lb, ub: Lower/upper bounds per reaction (lists of floats).
+ nrows, ncol: Dimensions of the S matrix.
+ len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK.
+ obj_coefs: Objective function coefficients (list of floats).
+ reactions: Reaction identifiers (list of str), used for output mapping.
+ return_lp: If True, also return the GLPK problem object (caller must delete).
+
+ Returns:
+ tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True.
+ """
lp = glp_create_prob();
@@ -60,8 +94,18 @@
raise Exception(e)
-# Solve LP problem from the structure of the metabolic model
def solve_lp_problem(lp,obj_coefs,reactions):
+ """
+ Configure and solve an LP with GLPK using provided objective coefficients.
+
+ Args:
+ lp: GLPK problem handle.
+ obj_coefs: Objective coefficients.
+ reactions: Reaction identifiers (unused in computation; length used for output).
+
+ Returns:
+ tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value.
+ """
# Set the coefficients of the objective function
i=1
@@ -101,8 +145,16 @@
# Re-enable terminal output after solving
glp_term_out(GLP_ON)
-# Create LP structure
def create_lp_structure(model):
+ """
+ Extract the LP structure (S matrix, bounds, objective) from a COBRA model.
+
+ Args:
+ model (cobra.Model): The COBRA model.
+
+ Returns:
+ tuple: (S, lb, ub, coefs_obj, reactions)
+ """
reactions=[el.id for el in model.reactions]
coefs_obj=[reaction.objective_coefficient for reaction in model.reactions]
@@ -116,8 +168,19 @@
return S,lb,ub,coefs_obj,reactions
-# CBS sampling interface
def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample):
+ """
+ Sample fluxes using GLPK by iterating over random objective functions.
+
+ Args:
+ model: COBRA model.
+ nsample: Number of samples to generate.
+ coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
+ df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
+
+ Returns:
+ None
+ """
S,lb,ub,coefs_obj,reactions = create_lp_structure(model)
len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S)
@@ -126,7 +189,7 @@
coefs_obj=coefficients_df.iloc[:,i].values
- if coefs_obj[-1]==1: #minimize
+ if coefs_obj[-1]==1: # minimize
coefs_obj= coefs_obj[0:-1] * -1
else:
coefs_obj=coefs_obj[0:-1]
@@ -134,17 +197,29 @@
fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector,
ia, ja, ar, coefs_obj,reactions,return_lp=False)
df_sample.loc[i] = fluxes
- pass
+ return
def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample):
+ """
+ Fallback sampling using COBRApy's optimize with per-sample randomized objectives.
+
+ Args:
+ model: COBRA model.
+ nsample: Number of samples to generate.
+ coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem).
+ df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions).
+
+ Returns:
+ None
+ """
for i in range(nsample):
dict_coeff={}
if(coefficients_df.iloc[-1][i]==1):
- type_problem = -1 #minimize
+ type_problem = -1 # minimize
else:
- type_problem = 1
+ type_problem = 1 # maximize
for rxn in [reaction.id for reaction in model.reactions]:
dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem
@@ -154,57 +229,69 @@
for rxn, flux in solution.items():
df_sample.loc[i][rxn] = flux
- pass
+ return
+
+def randomObjectiveFunction(model, n_samples, df_fva, seed=0):
+ """
+ Create random objective function coefficients for CBS sampling.
-# Create random coefficients for CBS
-def randomObjectiveFunction(model, n_samples, df_fva, seed=0):
-
+ The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize.
+
+ Args:
+ model: COBRA model.
+ n_samples: Number of random objectives to generate.
+ df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction.
+ seed: Seed for reproducibility.
- #reactions = model.reactions
- reactions = [reaction.id for reaction in model.reactions]
- cont=seed
- list_ex=reactions.copy()
- list_ex.append("type_of_problem")
- coefficients_df = pd.DataFrame(index=list_ex,columns=[str(i) for i in range(n_samples)])
+ Returns:
+ pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'.
+ """
+ # reactions = model.reactions
+ reactions = [reaction.id for reaction in model.reactions]
+ cont = seed
+ list_ex = reactions.copy()
+ list_ex.append("type_of_problem")
+ coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)])
+
+ for i in range(0, n_samples):
- for i in range(0, n_samples):
-
- cont=cont+1
+ cont = cont + 1
+ random.seed(cont)
+
+ # Generate a random threshold in [0, 1]
+ threshold = random.random()
+
+ for reaction in reactions:
+
+ cont = cont + 1
random.seed(cont)
-
- # Genera un numero casuale tra 0 e 1
- threshold = random.random() #coefficiente tra 0 e 1
-
- for reaction in reactions:
+
+ val = random.random()
- cont=cont+1
+ if val > threshold:
+
+ cont = cont + 1
random.seed(cont)
-
- val=random.random()
-
- if val>threshold:
+
+ # Coefficient in [-1, 1]
+ c = 2 * random.random() - 1
- cont=cont+1
- random.seed(cont)
-
- c=2*random.random()-1 #coefficiente tra -1 e 1
-
- val_max = np.max([abs(df_fva.loc[reaction,"minimum"]), abs(df_fva.loc[reaction,"maximum"])])
+ val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])])
+
+ if val_max != 0: # only if FVA is non-zero
+ coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA
+ else:
+ coefficients_df.loc[reaction, str(i)] = 0
- if val_max!=0: #solo se la fva è diversa da zero
- coefficients_df.loc[reaction,str(i)] = c/val_max #divido per la fva
- else:
- coefficients_df.loc[reaction,str(i)] = 0
+ else:
+ coefficients_df.loc[reaction, str(i)] = 0
- else:
- coefficients_df.loc[reaction,str(i)] = 0
+ cont = cont + 1
+ random.seed(cont)
- cont=cont+1
- random.seed(cont)
-
- if random.random()<0.5:
- coefficients_df.loc["type_of_problem",str(i)] = 0 #maximize
- else:
- coefficients_df.loc["type_of_problem",str(i)] = 1 #minimize
-
- return coefficients_df
+ if random.random() < 0.5:
+ coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize
+ else:
+ coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize
+
+ return coefficients_df
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/utils/general_utils.py
--- a/COBRAxy/utils/general_utils.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/general_utils.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,3 +1,13 @@
+"""
+General utilities for COBRAxy.
+
+This module provides:
+- File and path helpers (FileFormat, FilePath)
+- Error and result handling utilities (CustomErr, Result)
+- Basic I/O helpers (CSV/TSV, pickle, SVG)
+- Lightweight CLI argument parsers (Bool, Float)
+- Model loader utilities for COBRA models, including compressed formats
+"""
import math
import re
import sys
@@ -7,11 +17,10 @@
from enum import Enum
from itertools import count
-from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple
+from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
import pandas as pd
import cobra
-from cobra import Model as cobraModel, Reaction, Metabolite
import zipfile
import gzip
@@ -19,7 +28,7 @@
from io import StringIO
-
+from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
class ValueErr(Exception):
def __init__(self, param_name, expected, actual):
super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
@@ -32,21 +41,21 @@
"""
Encodes possible file extensions to conditionally save data in a different format.
"""
- DAT = ("dat",) # this is how galaxy treats all your files!
- CSV = ("csv",) # this is how most editable input data is written
- TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
- SVG = ("svg",) # this is how most metabolic maps are written
- PNG = ("png",) # this is a common output format for images (such as metabolic maps)
- PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
-
- # Updated to include compressed variants
- XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
- JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
- MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
- YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
+ DAT = ("dat",)
+ CSV = ("csv",)
+ TSV = ("tsv",)
+ SVG = ("svg",)
+ PNG = ("png",)
+ PDF = ("pdf",)
- TXT = ("txt",) # this is how most output data is written
- PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
+ # Compressed variants for common model formats
+ XML = ("xml", "xml.gz", "xml.zip", "xml.bz2")
+ JSON = ("json", "json.gz", "json.zip", "json.bz2")
+ MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2")
+ YML = ("yml", "yml.gz", "yml.zip", "yml.bz2")
+
+ TXT = ("txt",)
+ PICKLE = ("pickle", "pk", "p")
def __init__(self, *extensions):
self.extensions = extensions
@@ -83,11 +92,9 @@
Returns:
str : the string representation of the file extension.
"""
- # If we have an original extension stored (for compressed files only), use it
if hasattr(self, '_original_extension') and self._original_extension:
return self._original_extension
- # For XML, JSON, MAT and YML without original extension, use the base extension
if self == FileFormat.XML:
return "xml"
elif self == FileFormat.JSON:
@@ -101,18 +108,15 @@
class FilePath():
"""
- Represents a file path. View this as an attempt to standardize file-related operations by expecting
- values of this type in any process requesting a file path.
+ Represents a file path with format-aware helpers.
"""
def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
"""
- (Private) Initializes an instance of FilePath.
+ Initialize FilePath.
Args:
- path : the end of the path, containing the file name.
- ext : the file's extension.
- prefix : anything before path, if the last '/' isn't there it's added by the code.
- Returns:
- None : practically, a FilePath instance.
+ path: File name stem.
+ ext: File extension (FileFormat).
+ prefix: Optional directory path (trailing '/' auto-added).
"""
self.ext = ext
self.filePath = filePath
@@ -124,9 +128,7 @@
@classmethod
def fromStrPath(cls, path: str) -> "FilePath":
"""
- Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
- It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
- These double extensions are not supported for other file types such as .csv.
+ Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz).
Args:
path : the string containing the path
Raises:
@@ -141,27 +143,22 @@
prefix = result["prefix"] if result["prefix"] else ""
name, ext = result["name"], result["ext"]
- # Check for double extensions (json.gz, xml.zip, etc.)
parts = path.split(".")
if len(parts) >= 3:
penultimate = parts[-2]
last = parts[-1]
double_ext = f"{penultimate}.{last}"
- # Try the double extension first
try:
ext_format = FileFormat.fromExt(double_ext)
name = ".".join(parts[:-2])
- # Extract prefix if it exists
if '/' in name:
prefix = name[:name.rfind('/') + 1]
name = name[name.rfind('/') + 1:]
return cls(name, ext_format, prefix=prefix)
except ValueErr:
- # If double extension doesn't work, fall back to single extension
pass
- # Single extension fallback (original logic)
try:
ext_format = FileFormat.fromExt(ext)
return cls(name, ext_format, prefix=prefix)
@@ -198,19 +195,14 @@
newline is added by the function.
Args:
- s (str): The warning message to be logged and printed.
+ msg (str): The warning message to be logged and printed.
loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
immediately read back (beware relative expensive operation, log with caution).
Returns:
None
"""
- # building the path and then reading it immediately seems useless, but it's actually a way of
- # validating that reduces repetition on the caller's side. Besides, logging a message by writing
- # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
- # mindlessly logging whenever something comes up, log at the very end and tell the user everything
- # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
- # the file only at the end of the program's execution.
+ # Note: validates path via FilePath; keep logging minimal to avoid overhead.
with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
class CustomErr(Exception):
@@ -238,15 +230,19 @@
def throw(self, loggerPath = "") -> None:
"""
- Raises the current CustomErr instance, logging a warning message before doing so.
+ Raises the current CustomErr instance, optionally logging it first.
+
+ Args:
+ loggerPath (str): Optional path to a log file to append this error before raising.
Raises:
self: The current CustomErr instance.
-
+
Returns:
None
"""
- if loggerPath: logWarning(str(self), loggerPath)
+ if loggerPath:
+ logWarning(str(self), loggerPath)
raise self
def abort(self) -> None:
@@ -316,7 +312,7 @@
"""
def __init__(self, value :Union[T, E], isOk :bool) -> None:
"""
- (Private) Initializes an instance of Result.
+ Initialize an instance of Result.
Args:
value (Union[T, E]): The value to be stored in the Result instance.
@@ -332,7 +328,7 @@
@classmethod
def Ok(cls, value :T) -> "Result":
"""
- Constructs a new Result instance with a successful operation.
+ Construct a successful Result.
Args:
value (T): The value to be stored in the Result instance, set as successful.
@@ -345,7 +341,7 @@
@classmethod
def Err(cls, value :E) -> "Result":
"""
- Constructs a new Result instance with a failed operation.
+ Construct a failed Result.
Args:
value (E): The value to be stored in the Result instance, set as failed.
@@ -437,35 +433,6 @@
return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
# FILES
-def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
- """
- Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
-
- Args:
- path : the path to the dataset file.
- datasetName : the name of the dataset.
-
- Raises:
- DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
- it has less than 2 columns.
-
- Returns:
- pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
- """
- # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
- # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
- # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
- # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
- # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
- # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
- try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
- except:
- try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
- except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
-
- if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
- return dataset
-
def readPickle(path :FilePath) -> Any:
"""
Reads the contents of a .pickle file, which needs to exist at the given path.
@@ -570,6 +537,7 @@
# UI ARGUMENTS
class Bool:
+ """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive)."""
def __init__(self, argName :str) -> None:
self.argName = argName
@@ -582,6 +550,7 @@
raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
class Float:
+ """Float CLI argument parser supporting NaN and None keywords (case-insensitive)."""
def __init__(self, argName = "Dataset values, not an argument") -> None:
self.argName = argName
@@ -607,7 +576,7 @@
ENGRO2_no_legend = "ENGRO2_no_legend"
HMRcore = "HMRcore"
HMRcore_no_legend = "HMRcore_no_legend"
- Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
+ Custom = "Custom"
def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
if not path: raise PathErr("<>", "it's necessary to provide a custom path when retrieving files from a custom model")
@@ -635,17 +604,20 @@
return readPickle(path)
def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
+ """Open the SVG metabolic map for this model."""
path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
self.__raiseMissingPathErr(path)
return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
+ """Load the COBRA model for this enum variant (supports Custom with explicit path/extension)."""
if(self is Model.Custom):
return self.load_custom_model(customPath, customExtension)
else:
return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
+ """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not)."""
ext = ext if ext else file_path.ext
try:
if str(ext) in FileFormat.XML.value:
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/utils/model_utils.py
--- a/COBRAxy/utils/model_utils.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/model_utils.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,14 +1,20 @@
+"""
+Utilities for generating and manipulating COBRA models and related metadata.
+
+This module includes helpers to:
+- extract rules, reactions, bounds, objective coefficients, and compartments
+- build a COBRA model from a tabular file
+- set objective and medium from dataframes
+- validate a model and convert gene identifiers
+- translate model GPRs using mapping tables
+"""
import os
-import csv
import cobra
-import pickle
-import argparse
import pandas as pd
import re
import logging
from typing import Optional, Tuple, Union, List, Dict, Set
from collections import defaultdict
-import utils.general_utils as utils
import utils.rule_parsing as rulesUtils
import utils.reaction_parsing as reactionUtils
from cobra import Model as cobraModel, Reaction, Metabolite
@@ -17,19 +23,16 @@
ReactionId = str
def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
"""
- Generates a dictionary mapping reaction ids to rules from the model.
+ Generate a dictionary mapping reaction IDs to GPR rules from the model.
Args:
- model : the model to derive data from.
- asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
+ model: COBRA model to derive data from.
+ asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
Returns:
- Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
- Dict[ReactionId, str] : the generated dictionary of raw rules.
+ Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
+ Dict[ReactionId, str]: Raw rules by reaction ID.
"""
- # Is the below approach convoluted? yes
- # Ok but is it inefficient? probably
- # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
_ruleGetter = lambda reaction : reaction.gene_reaction_rule
ruleExtractor = (lambda reaction :
rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
@@ -41,14 +44,14 @@
def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
"""
- Generates a dictionary mapping reaction ids to reaction formulas from the model.
+ Generate a dictionary mapping reaction IDs to reaction formulas from the model.
Args:
- model : the model to derive data from.
- asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
+ model: COBRA model to derive data from.
+ asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
Returns:
- Dict[ReactionId, str] : the generated dictionary.
+ Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
"""
unparsedReactions = {
@@ -62,6 +65,12 @@
return reactionUtils.create_reaction_dict(unparsedReactions)
def get_medium(model:cobraModel) -> pd.DataFrame:
+ """
+ Extract the uptake reactions representing the model medium.
+
+ Returns a DataFrame with a single column 'reaction' listing exchange reactions
+ with negative lower bound and no positive stoichiometric coefficients (uptake only).
+ """
trueMedium=[]
for r in model.reactions:
positiveCoeff=0
@@ -77,16 +86,16 @@
def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
"""
- Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello.
-
+ Extract objective coefficients for each reaction.
+
Args:
- model : cobra.Model
-
+ model: COBRA model
+
Returns:
- pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient
+ pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
"""
coeffs = []
- # model.objective.expression è un'espressione lineare
+ # model.objective.expression is a linear expression
objective_expr = model.objective.expression.as_coefficients_dict()
for reaction in model.reactions:
@@ -99,6 +108,12 @@
return pd.DataFrame(coeffs)
def generate_bounds(model:cobraModel) -> pd.DataFrame:
+ """
+ Build a DataFrame of lower/upper bounds for all reactions.
+
+ Returns:
+ pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
+ """
rxns = []
for reaction in model.reactions:
@@ -160,45 +175,38 @@
def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
"""
- Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
-
+ Build a COBRApy model from a tabular file with reaction data.
+
Args:
- csv_path: Path al file CSV (separato da tab)
- model_id: ID del modello da creare
-
+ csv_path: Path to the tab-separated file.
+ model_id: ID for the newly created model.
+
Returns:
- cobra.Model: Il modello COBRApy costruito
+ cobra.Model: The constructed COBRApy model.
"""
- # Leggi i dati dal CSV
df = pd.read_csv(csv_path, sep='\t')
- # Crea il modello vuoto
model = cobraModel(model_id)
- # Dict per tenere traccia di metaboliti e compartimenti
metabolites_dict = {}
compartments_dict = {}
- print(f"Costruendo modello da {len(df)} reazioni...")
+ print(f"Building model from {len(df)} reactions...")
- # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
for idx, row in df.iterrows():
reaction_formula = str(row['Formula']).strip()
if not reaction_formula or reaction_formula == 'nan':
continue
- # Estrai metaboliti dalla formula della reazione
metabolites = extract_metabolites_from_reaction(reaction_formula)
for met_id in metabolites:
compartment = extract_compartment_from_metabolite(met_id)
- # Aggiungi compartimento se non esiste
if compartment not in compartments_dict:
compartments_dict[compartment] = compartment
- # Aggiungi metabolita se non esiste
if met_id not in metabolites_dict:
metabolites_dict[met_id] = Metabolite(
id=met_id,
@@ -206,15 +214,12 @@
name=met_id.replace(f"_{compartment}", "").replace("__", "_")
)
- # Aggiungi compartimenti al modello
model.compartments = compartments_dict
- # Aggiungi metaboliti al modello
model.add_metabolites(list(metabolites_dict.values()))
- print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
+ print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
- # Seconda passata: aggiungi le reazioni
reactions_added = 0
reactions_skipped = 0
@@ -223,44 +228,37 @@
reaction_id = str(row['ReactionID']).strip()
reaction_formula = str(row['Formula']).strip()
- # Salta reazioni senza formula
if not reaction_formula or reaction_formula == 'nan':
- raise ValueError(f"Formula della reazione mancante {reaction_id}")
+ raise ValueError(f"Missing reaction formula for {reaction_id}")
- # Crea la reazione
reaction = Reaction(reaction_id)
reaction.name = reaction_id
- # Imposta bounds
reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
- # Aggiungi gene rule se presente
if pd.notna(row['GPR']) and str(row['GPR']).strip():
reaction.gene_reaction_rule = str(row['GPR']).strip()
- # Parse della formula della reazione
try:
parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
except Exception as e:
- print(f"Errore nel parsing della reazione {reaction_id}: {e}")
+ print(f"Error parsing reaction {reaction_id}: {e}")
reactions_skipped += 1
continue
- # Aggiungi la reazione al modello
model.add_reactions([reaction])
reactions_added += 1
- print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
+ print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
# set objective function
set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
- # Imposta il medium
set_medium_from_data(model, df)
- print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
+ print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
return model
@@ -268,12 +266,12 @@
# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
"""
- Estrae gli ID dei metaboliti da una formula di reazione.
- Pattern robusto: cattura token che terminano con _ (es. _c, _m, _e)
- e permette che comincino con cifre o underscore.
+ Extract metabolite IDs from a reaction formula.
+ Robust pattern: tokens ending with _ (e.g., _c, _m, _e),
+ allowing leading digits/underscores.
"""
metabolites = set()
- # coefficiente opzionale seguito da un token che termina con _
+ # optional coefficient followed by a token ending with _
pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
matches = re.findall(pattern, reaction_formula)
metabolites.update(matches)
@@ -281,54 +279,39 @@
def extract_compartment_from_metabolite(metabolite_id: str) -> str:
- """
- Estrae il compartimento dall'ID del metabolita.
- """
- # Il compartimento è solitamente l'ultima lettera dopo l'underscore
+ """Extract the compartment from a metabolite ID."""
if '_' in metabolite_id:
return metabolite_id.split('_')[-1]
return 'c' # default cytoplasm
def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
- """
- Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
- """
+ """Parse a reaction formula and set metabolites with their coefficients."""
- if reaction.id == 'EX_thbpt_e':
- print(reaction.id)
- print(formula)
- # Dividi in parte sinistra e destra
if '<=>' in formula:
left, right = formula.split('<=>')
reversible = True
elif '<--' in formula:
left, right = formula.split('<--')
reversible = False
- left, right = left, right
elif '-->' in formula:
left, right = formula.split('-->')
reversible = False
elif '<-' in formula:
left, right = formula.split('<-')
reversible = False
- left, right = left, right
else:
- raise ValueError(f"Formato reazione non riconosciuto: {formula}")
+ raise ValueError(f"Unrecognized reaction format: {formula}")
- # Parse dei metaboliti e coefficienti
reactants = parse_metabolites_side(left.strip())
products = parse_metabolites_side(right.strip())
- # Aggiungi metaboliti alla reazione
metabolites_to_add = {}
- # Reagenti (coefficienti negativi)
for met_id, coeff in reactants.items():
if met_id in metabolites_dict:
metabolites_to_add[metabolites_dict[met_id]] = -coeff
- # Prodotti (coefficienti positivi)
for met_id, coeff in products.items():
if met_id in metabolites_dict:
metabolites_to_add[metabolites_dict[met_id]] = coeff
@@ -337,9 +320,7 @@
def parse_metabolites_side(side_str: str) -> Dict[str, float]:
- """
- Parsa un lato della reazione per estrarre metaboliti e coefficienti.
- """
+ """Parse one side of a reaction and extract metabolites with coefficients."""
metabolites = {}
if not side_str or side_str.strip() == '':
return metabolites
@@ -350,7 +331,7 @@
if not term:
continue
- # pattern allineato: coefficiente opzionale + id che termina con _
+ # optional coefficient + id ending with _
match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
if match:
coeff_str, met_id = match.groups()
@@ -372,7 +353,6 @@
reaction_id = str(row['ReactionID']).strip()
coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
if coeff != 0:
- # Assicuriamoci che la reazione esista nel modello
if reaction_id in model.reactions:
obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
else:
@@ -381,7 +361,6 @@
if not obj_dict:
raise ValueError("No reactions found with non-zero objective coefficient.")
- # Imposta la funzione obiettivo
model.objective = obj_dict
print(f"Objective set with {len(obj_dict)} reactions.")
@@ -389,9 +368,7 @@
def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
- """
- Imposta il medium basato sulla colonna InMedium.
- """
+ """Set the medium based on the 'InMedium' column in the dataframe."""
medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
medium_dict = {}
@@ -403,13 +380,11 @@
if medium_dict:
model.medium = medium_dict
- print(f"Medium impostato con {len(medium_dict)} componenti")
+ print(f"Medium set with {len(medium_dict)} components")
def validate_model(model: cobraModel) -> Dict[str, any]:
- """
- Valida il modello e fornisce statistiche di base.
- """
+ """Validate the model and return basic statistics."""
validation = {
'num_reactions': len(model.reactions),
'num_metabolites': len(model.metabolites),
@@ -422,7 +397,7 @@
}
try:
- # Test di crescita
+ # Growth test
solution = model.optimize()
validation['growth_rate'] = solution.objective_value
validation['status'] = solution.status
@@ -432,7 +407,8 @@
return validation
-def convert_genes(model,annotation):
+def convert_genes(model, annotation):
+ """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
from cobra.manipulation import rename_genes
model2=model.copy()
try:
@@ -450,12 +426,12 @@
def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
"""
- Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...}
- Lancia ValueError se non trova almeno un mapping utile.
+ Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
+ Raise ValueError if no suitable mapping is found.
"""
cols = { _normalize_colname(c): c for c in mapping_df.columns }
chosen = {}
- # possibili nomi per ciascuna categoria
+ # candidate names for each category
candidates = {
'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
@@ -476,13 +452,8 @@
model_source_genes: Optional[Set[str]] = None,
logger: Optional[logging.Logger] = None) -> None:
"""
- Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse),
- ogni target sia associato ad al massimo un source. Se trova target associati a
- >1 source solleva ValueError mostrando esempi.
-
- - mapping_df: DataFrame con colonne source_col, target_col
- - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo
- (se None, si usa tutto mapping_df)
+ Check that, within the filtered mapping_df, each target maps to at most one source.
+ Log examples if duplicates are found.
"""
if logger is None:
logger = logging.getLogger(__name__)
@@ -491,12 +462,12 @@
logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
return
- # normalizza le colonne temporanee per gruppi (senza modificare il df originale)
+ # normalize temporary columns for grouping (without altering the original df)
tmp = mapping_df[[source_col, target_col]].copy()
tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
- # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok)
+ # optionally filter to the set of model source genes
if model_source_genes is not None:
tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
@@ -504,27 +475,27 @@
logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
return
- # costruisci il reverse mapping target -> set(sources)
+ # build reverse mapping: target -> set(sources)
grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
- # trova target con più di 1 source
+ # find targets with more than one source
problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
if problematic:
- # prepara messaggio di errore con esempi (fino a 20)
+ # prepare warning message with examples (limited subset)
sample_items = list(problematic.items())
msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
for tgt, sources in sample_items:
msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
full_msg = "\n".join(msg_lines)
- # loggare e sollevare errore
+ # log warning
logger.warning(full_msg)
- # se tutto ok
+ # if everything is fine
logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
def _normalize_gene_id(g: str) -> str:
- """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip)."""
+ """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
if g is None:
return ""
g = str(g).strip()
@@ -535,30 +506,30 @@
def _simplify_boolean_expression(expr: str) -> str:
"""
- Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
- Gestisce espressioni con 'and' e 'or'.
+ Simplify a boolean expression by removing duplicates and redundancies.
+ Handles expressions with 'and' and 'or'.
"""
if not expr or not expr.strip():
return expr
- # Normalizza gli operatori
+ # normalize operators
expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
- # Funzione ricorsiva per processare le espressioni
+ # recursive helper to process expressions
def process_expression(s: str) -> str:
s = s.strip()
if not s:
return s
- # Gestisci le parentesi
+ # handle parentheses
while '(' in s:
- # Trova la parentesi più interna
+ # find the innermost parentheses
start = -1
for i, c in enumerate(s):
if c == '(':
start = i
elif c == ')' and start != -1:
- # Processa il contenuto tra parentesi
+ # process inner content
inner = s[start+1:i]
processed_inner = process_expression(inner)
s = s[:start] + processed_inner + s[i+1:]
@@ -566,7 +537,7 @@
else:
break
- # Dividi per 'or' al livello più alto
+ # split by 'or' at top level
or_parts = []
current_part = ""
paren_count = 0
@@ -590,10 +561,10 @@
if current_part.strip():
or_parts.append(current_part.strip())
- # Processa ogni parte OR
+ # process each OR part
processed_or_parts = []
for or_part in or_parts:
- # Dividi per 'and' all'interno di ogni parte OR
+ # split by 'and' within each OR part
and_parts = []
current_and = ""
paren_count = 0
@@ -617,7 +588,7 @@
if current_and.strip():
and_parts.append(current_and.strip())
- # Rimuovi duplicati nelle parti AND
+ # deduplicate AND parts
unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
if len(unique_and_parts) == 1:
@@ -625,7 +596,7 @@
elif len(unique_and_parts) > 1:
processed_or_parts.append(" and ".join(unique_and_parts))
- # Rimuovi duplicati nelle parti OR
+ # deduplicate OR parts
unique_or_parts = list(dict.fromkeys(processed_or_parts))
if len(unique_or_parts) == 1:
@@ -638,7 +609,7 @@
try:
return process_expression(expr)
except Exception:
- # Se la semplificazione fallisce, ritorna l'espressione originale
+ # if simplification fails, return the original expression
return expr
# ---------- Main public function ----------
@@ -649,13 +620,16 @@
allow_many_to_one: bool = False,
logger: Optional[logging.Logger] = None) -> 'cobra.Model':
"""
- Translate model genes from source_nomenclature to target_nomenclature.
- mapping_df should contain at least columns that allow the mapping
- (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
-
+ Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
+ mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
+
Args:
- allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target
- e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target.
+ model: COBRA model to translate.
+ mapping_df: DataFrame containing the mapping information.
+ target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
+ source_nomenclature: Current source key in the model (default 'hgnc_id').
+ allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
+ logger: Optional logger.
"""
if logger is None:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -711,11 +685,9 @@
filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
- # Se non ci sono righe rilevanti, avvisa
if filtered_map.empty:
logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
- # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
if not allow_many_to_one:
_validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
@@ -739,7 +711,6 @@
if gpr and gpr.strip():
new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
if new_gpr != gpr:
- # Semplifica l'espressione booleana per rimuovere duplicati
simplified_gpr = _simplify_boolean_expression(new_gpr)
if simplified_gpr != new_gpr:
stats['simplified_gprs'] += 1
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/utils/reaction_parsing.py
--- a/COBRAxy/utils/reaction_parsing.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/reaction_parsing.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,15 +1,22 @@
+"""
+Helpers to parse reaction strings into structured dictionaries.
+
+Features:
+- Reaction direction detection (forward, backward, reversible)
+- Parsing of custom reaction strings into stoichiometric maps
+- Conversion of a dict of raw reactions into a directional reactions dict
+- Loading custom reactions from a tabular file (TSV)
+"""
from enum import Enum
import utils.general_utils as utils
from typing import Dict
-import csv
import re
# Reaction direction encoding:
class ReactionDir(Enum):
"""
- A reaction can go forwards, backwards or be reversible (able to proceed in both directions).
- Models created / managed with cobrapy encode this information within the reaction's
- formula using the arrows this enum keeps as values.
+ A reaction can go forward, backward, or be reversible (both directions).
+ Cobrapy-style formulas encode direction using specific arrows handled here.
"""
FORWARD = "-->"
BACKWARD = "<--"
@@ -40,34 +47,29 @@
def add_custom_reaction(reactionsDict :ReactionsDict, rId :str, reaction :str) -> None:
"""
- Adds an entry to the given reactionsDict. Each entry consists of a given unique reaction id
- (key) and a :dict (value) matching each substrate in the reaction to its stoichiometric coefficient.
- Keys and values are both obtained from the reaction's formula: if a substrate (custom metabolite id)
- appears without an explicit coeff, the value 1.0 will be used instead.
+ Add one reaction entry to reactionsDict.
+
+ The entry maps each substrate ID to its stoichiometric coefficient.
+ If a substrate appears without an explicit coefficient, 1.0 is assumed.
Args:
- reactionsDict : dictionary encoding custom reactions information.
- rId : unique reaction id.
- reaction : the reaction's formula.
+ reactionsDict: Dict to update in place.
+ rId: Unique reaction ID.
+ reaction: Reaction formula string.
Returns:
None
- Side effects:
- reactionsDict : mut
+ Side effects: updates reactionsDict in place.
"""
reaction = reaction.strip()
if not reaction: return
reactionsDict[rId] = {}
- # We assume the '+' separating consecutive metabs in a reaction is spaced from them,
- # to avoid confusing it for electrical charge:
+ # Assumes ' + ' is spaced to avoid confusion with charge symbols.
for word in reaction.split(" + "):
metabId, stoichCoeff = word, 1.0
- # Implicit stoichiometric coeff is equal to 1, some coeffs are floats.
-
- # Accepted coeffs can be integer or floats with a dot (.) decimal separator
- # and must be separated from the metab with a space:
+ # Coefficient can be integer or float (dot decimal) and must be space-separated.
foundCoeff = re.search(r"\d+(\.\d+)? ", word)
if foundCoeff:
wholeMatch = foundCoeff.group(0)
@@ -81,48 +83,39 @@
def create_reaction_dict(unparsed_reactions: Dict[str, str]) -> ReactionsDict:
"""
- Parses the given dictionary into the correct format.
+ Parse a dict of raw reaction strings into a directional reactions dict.
Args:
- unparsed_reactions (Dict[str, str]): A dictionary where keys are reaction IDs and values are unparsed reaction strings.
+ unparsed_reactions: Mapping reaction ID -> raw reaction string.
Returns:
- ReactionsDict: The correctly parsed dict.
+ ReactionsDict: Parsed dict. Reversible reactions produce two entries with _F and _B suffixes.
"""
reactionsDict :ReactionsDict = {}
for rId, reaction in unparsed_reactions.items():
reactionDir = ReactionDir.fromReaction(reaction)
left, right = reaction.split(f" {reactionDir.value} ")
- # Reversible reactions are split into distinct reactions, one for each direction.
- # In general we only care about substrates, the product information is lost.
+ # Reversible reactions are split into two: forward (_F) and backward (_B).
reactionIsReversible = reactionDir is ReactionDir.REVERSIBLE
if reactionDir is not ReactionDir.BACKWARD:
add_custom_reaction(reactionsDict, rId + "_F" * reactionIsReversible, left)
if reactionDir is not ReactionDir.FORWARD:
add_custom_reaction(reactionsDict, rId + "_B" * reactionIsReversible, right)
-
- # ^^^ to further clarify: if a reaction is NOT reversible it will not be marked as _F or _B
- # and whichever direction we DO keep (forward if --> and backward if <--) loses this information.
- # This IS a small problem when coloring the map in marea.py because the arrow IDs in the map follow
- # through with a similar convention on ALL reactions and correctly encode direction based on their
- # model of origin. TODO: a proposed solution is to unify the standard in RPS to fully mimic the maps,
- # which involves re-writing the "reactions" dictionary.
return reactionsDict
def parse_custom_reactions(customReactionsPath :str) -> ReactionsDict:
"""
- Creates a custom dictionary encoding reactions information from a csv file containing
- data about these reactions, the path of which is given as input.
+ Load custom reactions from a tabular file and parse into a reactions dict.
Args:
- customReactionsPath : path to the reactions information file.
+ customReactionsPath: Path to the reactions file (TSV or CSV-like).
Returns:
- ReactionsDict : dictionary encoding custom reactions information.
+ ReactionsDict: Parsed reactions dictionary.
"""
try:
rows = utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath), delimiter = "\t", skipHeader=False)
@@ -132,7 +125,7 @@
id_idx, idx_formula = utils.findIdxByName(rows[0], "Formula")
except Exception as e:
-
+ # Fallback re-read with same settings; preserves original behavior
rows = utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath), delimiter = "\t", skipHeader=False)
if len(rows) <= 1:
raise ValueError("The custom reactions file must contain at least one reaction.")
diff -r 4e2bc80764b6 -r a6e45049c1b9 COBRAxy/utils/rule_parsing.py
--- a/COBRAxy/utils/rule_parsing.py Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/rule_parsing.py Fri Sep 12 17:28:45 2025 +0000
@@ -1,10 +1,20 @@
+"""
+Parsing utilities for gene rules (GPRs).
+
+This module provides:
+- RuleErr: structured errors for malformed rules
+- RuleOp: valid logical operators (AND/OR)
+- OpList: nested list structure representing parsed rules with explicit operator
+- RuleStack: helper stack to build nested OpLists during parsing
+- parseRuleToNestedList: main entry to parse a rule string into an OpList
+"""
from enum import Enum
import utils.general_utils as utils
from typing import List, Union, Optional
class RuleErr(utils.CustomErr):
"""
- CustomErr subclass for rule syntax errors.
+ Error type for rule syntax errors.
"""
errName = "Rule Syntax Error"
def __init__(self, rule :str, msg = "no further details provided") -> None:
@@ -14,7 +24,7 @@
class RuleOp(Enum):
"""
- Encodes all operators valid in gene rules.
+ Valid logical operators for gene rules.
"""
OR = "or"
AND = "and"
@@ -27,7 +37,7 @@
class OpList(List[Union[str, "OpList"]]):
"""
- Represents a parsed rule and each of its nesting levels, including the operator that level uses.
+ Parsed rule structure: a list with an associated operator for that level.
"""
def __init__(self, op :Optional[RuleOp] = None) -> None:
"""
@@ -70,8 +80,7 @@
class RuleStack:
"""
- FILO stack structure to save the intermediate representation of a Rule during parsing, with the
- current nesting level at the top of the stack.
+ FILO stack used during parsing to build nested OpLists; the top is the current level.
"""
def __init__(self) -> None:
"""
@@ -177,51 +186,49 @@
def parseRuleToNestedList(rule :str) -> OpList:
"""
- Parse a single rule from its string representation to an OpList, making all priority explicit
- through nesting levels.
+ Parse a rule string into an OpList, making operator precedence explicit via nesting.
Args:
- rule : the string representation of a rule to be parsed.
+ rule: Rule string to parse (supports parentheses, 'and', 'or').
Raises:
- RuleErr : whenever something goes wrong during parsing.
+ RuleErr: If the rule is malformed (e.g., mismatched parentheses or misplaced operators).
Returns:
- OpList : the parsed rule.
+ OpList: Parsed rule as an OpList structure.
"""
source = iter(rule
- .replace("(", "( ").replace(")", " )") # Single out parens as words
- .strip() # remove whitespace at extremities
+ .replace("(", "( ").replace(")", " )") # single out parentheses as words
+ .strip() # trim edges
.split()) # split by spaces
stack = RuleStack()
nestingErr = RuleErr(rule, "mismatch between open and closed parentheses")
try:
- while True: # keep reading until source ends
+ while True: # read until source ends
while True:
- operand = next(source, None) # expected name or rule opening
+ operand = next(source, None) # expect operand or '('
if operand is None: raise RuleErr(rule, "found trailing open parentheses")
- if operand == "and" or operand == "or" or operand == ")": # found operator instead, panic
+ if operand in ("and", "or", ")"): # unexpected operator position
raise RuleErr(rule, f"found \"{operand}\" in unexpected position")
- if operand != "(": break # found name
+ if operand != "(": break # got a name
- # found rule opening, we add new nesting level but don't know the operator
+ # found rule opening: add a new nesting level
stack.push()
stack.current.append(operand)
- while True: # keep reading until operator is found or source ends
- operator = next(source, None) # expected operator or rule closing
- if operator and operator != ")": break # found operator
+ while True: # read until operator found or source ends
+ operator = next(source, None) # expect operator or ')'
+ if operator and operator != ")": break # got operator
- if stack.currentIsAnd(): stack.pop() # we close the "and" chain
+ if stack.currentIsAnd(): stack.pop() # close current AND chain
if not operator: break
- stack.pop() # we close the parentheses
+ stack.pop() # close parentheses
- # we proceed with operator:
- if not operator: break # there is no such thing as a double loop break.. yet
+ if not operator: break
if not RuleOp.isOperator(operator): raise RuleErr(
rule, f"found \"{operator}\" in unexpected position, expected operator")
@@ -234,7 +241,7 @@
stack.push(operator)
stack.popForward()
- stack.current.setOpIfMissing(operator) # buffer now knows what operator its data had
+ stack.current.setOpIfMissing(operator)
except RuleErr as err: raise err # bubble up proper errors
except: raise nestingErr # everything else is interpreted as a nesting error.