# HG changeset patch
# User francesco_lapi
# Date 1757069616 0
# Node ID f73d57641124042a2a351f7634bf10f0214b46a6
# Parent 4e53b178031ae415f4e003be4cf6d8d4054f2a60
Uploaded
diff -r 4e53b178031a -r f73d57641124 COBRAxy/custom_data_generator.xml
--- a/COBRAxy/custom_data_generator.xml Fri Sep 05 10:50:12 2025 +0000
+++ b/COBRAxy/custom_data_generator.xml Fri Sep 05 10:53:36 2025 +0000
@@ -26,6 +26,9 @@
#if $cond_model.medium_selector == 'Custom'
--medium $cond_model.medium
#end if
+
+ --gene_format $gene_format
+
--out_log $log
--out_tabular $out_tabular
]]>
@@ -80,6 +83,14 @@
+
+
+
+
+
+
+
+
diff -r 4e53b178031a -r f73d57641124 COBRAxy/utils/general_utils.py
--- a/COBRAxy/utils/general_utils.py Fri Sep 05 10:50:12 2025 +0000
+++ b/COBRAxy/utils/general_utils.py Fri Sep 05 10:53:36 2025 +0000
@@ -1,701 +1,701 @@
-import math
-import re
-import sys
-import csv
-import pickle
-import lxml.etree as ET
-
-from enum import Enum
-from itertools import count
-from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union
-
-import pandas as pd
-import cobra
-
-import zipfile
-import gzip
-import bz2
-from io import StringIO
-
-class ValueErr(Exception):
- def __init__(self, param_name, expected, actual):
- super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
-
-class PathErr(Exception):
- def __init__(self, path, message):
- super().__init__(f"Path error for '{path}': {message}")
-
-class FileFormat(Enum):
- """
- Encodes possible file extensions to conditionally save data in a different format.
- """
- DAT = ("dat",) # this is how galaxy treats all your files!
- CSV = ("csv",) # this is how most editable input data is written
- TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
- SVG = ("svg",) # this is how most metabolic maps are written
- PNG = ("png",) # this is a common output format for images (such as metabolic maps)
- PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
-
- # Updated to include compressed variants
- XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
- JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
- MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
- YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
-
- TXT = ("txt",) # this is how most output data is written
- PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
-
- def __init__(self, *extensions):
- self.extensions = extensions
- # Store original extension when set via fromExt
- self._original_extension = None
-
- @classmethod
- def fromExt(cls, ext: str) -> "FileFormat":
- """
- Converts a file extension string to a FileFormat instance.
- Args:
- ext : The file extension as a string.
- Returns:
- FileFormat: The FileFormat instance corresponding to the file extension.
- """
- variantName = ext.upper()
- if variantName in FileFormat.__members__:
- instance = FileFormat[variantName]
- instance._original_extension = ext
- return instance
-
- variantName = ext.lower()
- for member in cls:
- if variantName in member.value:
- # Create a copy-like behavior by storing the original extension
- member._original_extension = ext
- return member
-
- raise ValueErr("ext", "a valid FileFormat file extension", ext)
-
- def __str__(self) -> str:
- """
- (Private) converts to str representation. Good practice for usage with argparse.
- Returns:
- str : the string representation of the file extension.
- """
- # If we have an original extension stored (for compressed files only), use it
- if hasattr(self, '_original_extension') and self._original_extension:
- return self._original_extension
-
- # For XML, JSON, MAT and YML without original extension, use the base extension
- if self == FileFormat.XML:
- return "xml"
- elif self == FileFormat.JSON:
- return "json"
- elif self == FileFormat.MAT:
- return "mat"
- elif self == FileFormat.YML:
- return "yml"
-
- return self.value[-1]
-
-class FilePath():
- """
- Represents a file path. View this as an attempt to standardize file-related operations by expecting
- values of this type in any process requesting a file path.
- """
- def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
- """
- (Private) Initializes an instance of FilePath.
- Args:
- path : the end of the path, containing the file name.
- ext : the file's extension.
- prefix : anything before path, if the last '/' isn't there it's added by the code.
- Returns:
- None : practically, a FilePath instance.
- """
- self.ext = ext
- self.filePath = filePath
-
- if prefix and prefix[-1] != '/':
- prefix += '/'
- self.prefix = prefix
-
- @classmethod
- def fromStrPath(cls, path: str) -> "FilePath":
- """
- Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
- It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
- These double extensions are not supported for other file types such as .csv.
- Args:
- path : the string containing the path
- Raises:
- PathErr : if the provided string doesn't represent a valid path.
- Returns:
- FilePath : the constructed instance.
- """
- result = re.search(r"^(?P.*\/)?(?P.*)\.(?P[^.]*)$", path)
- if not result or not result["name"] or not result["ext"]:
- raise PathErr(path, "cannot recognize folder structure or extension in path")
-
- prefix = result["prefix"] if result["prefix"] else ""
- name, ext = result["name"], result["ext"]
-
- # Check for double extensions (json.gz, xml.zip, etc.)
- parts = path.split(".")
- if len(parts) >= 3:
- penultimate = parts[-2]
- last = parts[-1]
- double_ext = f"{penultimate}.{last}"
-
- # Try the double extension first
- try:
- ext_format = FileFormat.fromExt(double_ext)
- name = ".".join(parts[:-2])
- # Extract prefix if it exists
- if '/' in name:
- prefix = name[:name.rfind('/') + 1]
- name = name[name.rfind('/') + 1:]
- return cls(name, ext_format, prefix=prefix)
- except ValueErr:
- # If double extension doesn't work, fall back to single extension
- pass
-
- # Single extension fallback (original logic)
- try:
- ext_format = FileFormat.fromExt(ext)
- return cls(name, ext_format, prefix=prefix)
- except ValueErr:
- raise PathErr(path, f"unsupported file extension: {ext}")
-
- def show(self) -> str:
- """
- Shows the path as a string.
- Returns:
- str : the path shown as a string.
- """
- return f"{self.prefix}{self.filePath}.{self.ext}"
-
- def __str__(self) -> str:
- return self.show()
-
-# ERRORS
-def terminate(msg :str) -> None:
- """
- Terminate the execution of the script with an error message.
-
- Args:
- msg (str): The error message to be displayed.
-
- Returns:
- None
- """
- sys.exit(f"Execution aborted: {msg}\n")
-
-def logWarning(msg :str, loggerPath :str) -> None:
- """
- Log a warning message to an output log file and print it to the console. The final period and a
- newline is added by the function.
-
- Args:
- s (str): The warning message to be logged and printed.
- loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
- immediately read back (beware relative expensive operation, log with caution).
-
- Returns:
- None
- """
- # building the path and then reading it immediately seems useless, but it's actually a way of
- # validating that reduces repetition on the caller's side. Besides, logging a message by writing
- # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
- # mindlessly logging whenever something comes up, log at the very end and tell the user everything
- # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
- # the file only at the end of the program's execution.
- with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
-
-class CustomErr(Exception):
- """
- Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
- """
- __idGenerator = count()
- errName = "Custom Error"
- def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
- """
- (Private) Initializes an instance of CustomErr.
-
- Args:
- msg (str): Error message to be displayed.
- details (str): Informs the user more about the error encountered. Defaults to "".
- explicitErrCode (int): Explicit error code to be used. Defaults to -1.
-
- Returns:
- None : practically, a CustomErr instance.
- """
- self.msg = msg
- self.details = details
-
- self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
-
- def throw(self, loggerPath = "") -> None:
- """
- Raises the current CustomErr instance, logging a warning message before doing so.
-
- Raises:
- self: The current CustomErr instance.
-
- Returns:
- None
- """
- if loggerPath: logWarning(str(self), loggerPath)
- raise self
-
- def abort(self) -> None:
- """
- Aborts the execution of the script.
-
- Returns:
- None
- """
- terminate(str(self))
-
- def __str__(self) -> str:
- """
- (Private) Returns a string representing the current CustomErr instance.
-
- Returns:
- str: A string representing the current CustomErr instance.
- """
- return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
-
-class ArgsErr(CustomErr):
- """
- CustomErr subclass for UI arguments errors.
- """
- errName = "Args Error"
- def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
- super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
-
-class DataErr(CustomErr):
- """
- CustomErr subclass for data formatting errors.
- """
- errName = "Data Format Error"
- def __init__(self, fileName :str, msg = "no further details provided") -> None:
- super().__init__(f"file \"{fileName}\" contains malformed data", msg)
-
-class PathErr(CustomErr):
- """
- CustomErr subclass for filepath formatting errors.
- """
- errName = "Path Error"
- def __init__(self, path :FilePath, msg = "no further details provided") -> None:
- super().__init__(f"path \"{path}\" is invalid", msg)
-
-class ValueErr(CustomErr):
- """
- CustomErr subclass for any value error.
- """
- errName = "Value Error"
- def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
- super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
-
-# RESULT
-T = TypeVar('T')
-E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
-class Result(Generic[T, E]):
- class ResultErr(CustomErr):
- """
- CustomErr subclass for all Result errors.
- """
- errName = "Result Error"
- def __init__(self, msg = "no further details provided") -> None:
- super().__init__(msg)
- """
- Class to handle the result of an operation, with a value and a boolean flag to indicate
- whether the operation was successful or not.
- """
- def __init__(self, value :Union[T, E], isOk :bool) -> None:
- """
- (Private) Initializes an instance of Result.
-
- Args:
- value (Union[T, E]): The value to be stored in the Result instance.
- isOk (bool): A boolean flag to indicate whether the operation was successful or not.
-
- Returns:
- None : practically, a Result instance.
- """
- self.isOk = isOk
- self.isErr = not isOk
- self.value = value
-
- @classmethod
- def Ok(cls, value :T) -> "Result":
- """
- Constructs a new Result instance with a successful operation.
-
- Args:
- value (T): The value to be stored in the Result instance, set as successful.
-
- Returns:
- Result: A new Result instance with a successful operation.
- """
- return Result(value, isOk = True)
-
- @classmethod
- def Err(cls, value :E) -> "Result":
- """
- Constructs a new Result instance with a failed operation.
-
- Args:
- value (E): The value to be stored in the Result instance, set as failed.
-
- Returns:
- Result: A new Result instance with a failed operation.
- """
- return Result(value, isOk = False)
-
- def unwrap(self) -> T:
- """
- Unwraps the value of the Result instance, if the operation was successful.
-
- Raises:
- ResultErr: If the operation was not successful.
-
- Returns:
- T: The value of the Result instance, if the operation was successful.
- """
- if self.isOk: return self.value
- raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
-
- def unwrapOr(self, default :T) -> T:
- """
- Unwraps the value of the Result instance, if the operation was successful, otherwise
- it returns a default value.
-
- Args:
- default (T): The default value to be returned if the operation was not successful.
-
- Returns:
- T: The value of the Result instance, if the operation was successful,
- otherwise the default value.
- """
- return self.value if self.isOk else default
-
- def expect(self, err :"Result.ResultErr") -> T:
- """
- Expects that the value of the Result instance is successful, otherwise it raises an error.
-
- Args:
- err (Exception): The error to be raised if the operation was not successful.
-
- Raises:
- err: The error raised if the operation was not successful.
-
- Returns:
- T: The value of the Result instance, if the operation was successful.
- """
- if self.isOk: return self.value
- raise err
-
- U = TypeVar("U")
- def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
- """
- Maps the value of the current Result to whatever is returned by the mapper function.
- If the Result contained an unsuccessful operation to begin with it remains unchanged
- (a reference to the current instance is returned).
- If the mapper function panics the returned result instance will be of the error kind.
-
- Args:
- mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
-
- Returns:
- Result[U, E]: The result of the mapper operation applied to the Result value.
- """
- if self.isErr: return self
- try: return Result.Ok(mapper(self.value))
- except Exception as e: return Result.Err(e)
-
- D = TypeVar("D", bound = "Result.ResultErr")
- def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
- """
- Maps the error of the current Result to whatever is returned by the mapper function.
- If the Result contained a successful operation it remains unchanged
- (a reference to the current instance is returned).
- If the mapper function panics this method does as well.
-
- Args:
- mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
-
- Returns:
- Result[U, E]: The result of the mapper operation applied to the Result error.
- """
- if self.isOk: return self
- return Result.Err(mapper(self.value))
-
- def __str__(self):
- return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
-
-# FILES
-def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
- """
- Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
-
- Args:
- path : the path to the dataset file.
- datasetName : the name of the dataset.
-
- Raises:
- DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
- it has less than 2 columns.
-
- Returns:
- pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
- """
- # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
- # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
- # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
- # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
- # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
- # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
- try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
- except:
- try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
- except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
-
- if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
- return dataset
-
-def readPickle(path :FilePath) -> Any:
- """
- Reads the contents of a .pickle file, which needs to exist at the given path.
-
- Args:
- path : the path to the .pickle file.
-
- Returns:
- Any : the data inside a pickle file, could be anything.
- """
- with open(path.show(), "rb") as fd: return pickle.load(fd)
-
-def writePickle(path :FilePath, data :Any) -> None:
- """
- Saves any data in a .pickle file, created at the given path.
-
- Args:
- path : the path to the .pickle file.
- data : the data to be written to the file.
-
- Returns:
- None
- """
- with open(path.show(), "wb") as fd: pickle.dump(data, fd)
-
-def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
- """
- Reads the contents of a .csv file, which needs to exist at the given path.
-
- Args:
- path : the path to the .csv file.
- delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
- skipHeader : whether the first row of the file is a header and should be skipped.
-
- Returns:
- List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
- """
- with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
-
-def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
- """
- Reads the contents of a .svg file, which needs to exist at the given path.
-
- Args:
- path : the path to the .svg file.
-
- Raises:
- DataErr : if the map is malformed.
-
- Returns:
- Any : the data inside a svg file, could be anything.
- """
- try: return ET.parse(path.show())
- except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
- raise customErr if customErr else err
-
-def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
- """
- Saves svg data opened with lxml.etree in a .svg file, created at the given path.
-
- Args:
- path : the path to the .svg file.
- data : the data to be written to the file.
-
- Returns:
- None
- """
- with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
-
-# UI ARGUMENTS
-class Bool:
- def __init__(self, argName :str) -> None:
- self.argName = argName
-
- def __call__(self, s :str) -> bool: return self.check(s)
-
- def check(self, s :str) -> bool:
- s = s.lower()
- if s == "true" : return True
- if s == "false": return False
- raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
-
-class Float:
- def __init__(self, argName = "Dataset values, not an argument") -> None:
- self.argName = argName
-
- def __call__(self, s :str) -> float: return self.check(s)
-
- def check(self, s :str) -> float:
- try: return float(s)
- except ValueError:
- s = s.lower()
- if s == "nan" or s == "none": return math.nan
- raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
-
-# MODELS
-OldRule = List[Union[str, "OldRule"]]
-class Model(Enum):
- """
- Represents a metabolic model, either custom or locally supported. Custom models don't point
- to valid file paths.
- """
-
- Recon = "Recon"
- ENGRO2 = "ENGRO2"
- ENGRO2_no_legend = "ENGRO2_no_legend"
- HMRcore = "HMRcore"
- HMRcore_no_legend = "HMRcore_no_legend"
- Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
-
- def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
- if not path: raise PathErr("<>", "it's necessary to provide a custom path when retrieving files from a custom model")
-
- def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
- """
- Open "rules" file for this model.
-
- Returns:
- Dict[str, Dict[str, OldRule]] : the rules for this model.
- """
- path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
- self.__raiseMissingPathErr(path)
- return readPickle(path)
-
- def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
- """
- Open "gene translator (old: gene_in_rule)" file for this model.
-
- Returns:
- Dict[str, Dict[str, str]] : the translator dict for this model.
- """
- path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
- self.__raiseMissingPathErr(path)
- return readPickle(path)
-
- def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
- path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
- self.__raiseMissingPathErr(path)
- return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
-
- def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
- if(self is Model.Custom):
- return self.load_custom_model(customPath, customExtension)
- else:
- return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
-
- def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
- ext = ext if ext else file_path.ext
- try:
- if str(ext) in FileFormat.XML.value:
- return cobra.io.read_sbml_model(file_path.show())
-
- if str(ext) in FileFormat.JSON.value:
- # Compressed files are not automatically handled by cobra
- if(ext == "json"):
- return cobra.io.load_json_model(file_path.show())
- else:
- return self.extract_model(file_path, ext, "json")
-
- if str(ext) in FileFormat.MAT.value:
- # Compressed files are not automatically handled by cobra
- if(ext == "mat"):
- return cobra.io.load_matlab_model(file_path.show())
- else:
- return self.extract_model(file_path, ext, "mat")
-
- if str(ext) in FileFormat.YML.value:
- # Compressed files are not automatically handled by cobra
- if(ext == "yml"):
- return cobra.io.load_yaml_model(file_path.show())
- else:
- return self.extract_model(file_path, ext, "yml")
-
- except Exception as e: raise DataErr(file_path, e.__str__())
- raise DataErr(file_path,
- f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
-
-
- def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
- """
- Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
-
- Args:
- file_path: File path of the model
- ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
-
- Returns:
- cobra.Model: COBRApy model
-
- Raises:
- Exception: Extraction errors
- """
- ext_str = str(ext)
-
- try:
- if '.zip' in ext_str:
- with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
- with zip_ref.open(zip_ref.namelist()[0]) as json_file:
- content = json_file.read().decode('utf-8')
- if model_encoding == "json":
- return cobra.io.load_json_model(StringIO(content))
- elif model_encoding == "mat":
- return cobra.io.load_matlab_model(StringIO(content))
- elif model_encoding == "yml":
- return cobra.io.load_yaml_model(StringIO(content))
- else:
- raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
- elif '.gz' in ext_str:
- with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
- if model_encoding == "json":
- return cobra.io.load_json_model(gz_ref)
- elif model_encoding == "mat":
- return cobra.io.load_matlab_model(gz_ref)
- elif model_encoding == "yml":
- return cobra.io.load_yaml_model(gz_ref)
- else:
- raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
- elif '.bz2' in ext_str:
- with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
- if model_encoding == "json":
- return cobra.io.load_json_model(bz2_ref)
- elif model_encoding == "mat":
- return cobra.io.load_matlab_model(bz2_ref)
- elif model_encoding == "yml":
- return cobra.io.load_yaml_model(bz2_ref)
- else:
- raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
- else:
- raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
-
- except Exception as e:
- raise Exception(f"Error during model extraction: {str(e)}")
-
-
-
+import math
+import re
+import sys
+import csv
+import pickle
+import lxml.etree as ET
+
+from enum import Enum
+from itertools import count
+from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union
+
+import pandas as pd
+import cobra
+
+import zipfile
+import gzip
+import bz2
+from io import StringIO
+
+class ValueErr(Exception):
+ def __init__(self, param_name, expected, actual):
+ super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
+
+class PathErr(Exception):
+ def __init__(self, path, message):
+ super().__init__(f"Path error for '{path}': {message}")
+
+class FileFormat(Enum):
+ """
+ Encodes possible file extensions to conditionally save data in a different format.
+ """
+ DAT = ("dat",) # this is how galaxy treats all your files!
+ CSV = ("csv",) # this is how most editable input data is written
+ TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
+ SVG = ("svg",) # this is how most metabolic maps are written
+ PNG = ("png",) # this is a common output format for images (such as metabolic maps)
+ PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
+
+ # Updated to include compressed variants
+ XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
+ JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
+ MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
+ YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
+
+ TXT = ("txt",) # this is how most output data is written
+ PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
+
+ def __init__(self, *extensions):
+ self.extensions = extensions
+ # Store original extension when set via fromExt
+ self._original_extension = None
+
+ @classmethod
+ def fromExt(cls, ext: str) -> "FileFormat":
+ """
+ Converts a file extension string to a FileFormat instance.
+ Args:
+ ext : The file extension as a string.
+ Returns:
+ FileFormat: The FileFormat instance corresponding to the file extension.
+ """
+ variantName = ext.upper()
+ if variantName in FileFormat.__members__:
+ instance = FileFormat[variantName]
+ instance._original_extension = ext
+ return instance
+
+ variantName = ext.lower()
+ for member in cls:
+ if variantName in member.value:
+ # Create a copy-like behavior by storing the original extension
+ member._original_extension = ext
+ return member
+
+ raise ValueErr("ext", "a valid FileFormat file extension", ext)
+
+ def __str__(self) -> str:
+ """
+ (Private) converts to str representation. Good practice for usage with argparse.
+ Returns:
+ str : the string representation of the file extension.
+ """
+ # If we have an original extension stored (for compressed files only), use it
+ if hasattr(self, '_original_extension') and self._original_extension:
+ return self._original_extension
+
+ # For XML, JSON, MAT and YML without original extension, use the base extension
+ if self == FileFormat.XML:
+ return "xml"
+ elif self == FileFormat.JSON:
+ return "json"
+ elif self == FileFormat.MAT:
+ return "mat"
+ elif self == FileFormat.YML:
+ return "yml"
+
+ return self.value[-1]
+
+class FilePath():
+ """
+ Represents a file path. View this as an attempt to standardize file-related operations by expecting
+ values of this type in any process requesting a file path.
+ """
+ def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
+ """
+ (Private) Initializes an instance of FilePath.
+ Args:
+ path : the end of the path, containing the file name.
+ ext : the file's extension.
+ prefix : anything before path, if the last '/' isn't there it's added by the code.
+ Returns:
+ None : practically, a FilePath instance.
+ """
+ self.ext = ext
+ self.filePath = filePath
+
+ if prefix and prefix[-1] != '/':
+ prefix += '/'
+ self.prefix = prefix
+
+ @classmethod
+ def fromStrPath(cls, path: str) -> "FilePath":
+ """
+ Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
+ It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
+ These double extensions are not supported for other file types such as .csv.
+ Args:
+ path : the string containing the path
+ Raises:
+ PathErr : if the provided string doesn't represent a valid path.
+ Returns:
+ FilePath : the constructed instance.
+ """
+ result = re.search(r"^(?P.*\/)?(?P.*)\.(?P[^.]*)$", path)
+ if not result or not result["name"] or not result["ext"]:
+ raise PathErr(path, "cannot recognize folder structure or extension in path")
+
+ prefix = result["prefix"] if result["prefix"] else ""
+ name, ext = result["name"], result["ext"]
+
+ # Check for double extensions (json.gz, xml.zip, etc.)
+ parts = path.split(".")
+ if len(parts) >= 3:
+ penultimate = parts[-2]
+ last = parts[-1]
+ double_ext = f"{penultimate}.{last}"
+
+ # Try the double extension first
+ try:
+ ext_format = FileFormat.fromExt(double_ext)
+ name = ".".join(parts[:-2])
+ # Extract prefix if it exists
+ if '/' in name:
+ prefix = name[:name.rfind('/') + 1]
+ name = name[name.rfind('/') + 1:]
+ return cls(name, ext_format, prefix=prefix)
+ except ValueErr:
+ # If double extension doesn't work, fall back to single extension
+ pass
+
+ # Single extension fallback (original logic)
+ try:
+ ext_format = FileFormat.fromExt(ext)
+ return cls(name, ext_format, prefix=prefix)
+ except ValueErr:
+ raise PathErr(path, f"unsupported file extension: {ext}")
+
+ def show(self) -> str:
+ """
+ Shows the path as a string.
+ Returns:
+ str : the path shown as a string.
+ """
+ return f"{self.prefix}{self.filePath}.{self.ext}"
+
+ def __str__(self) -> str:
+ return self.show()
+
+# ERRORS
+def terminate(msg :str) -> None:
+ """
+ Terminate the execution of the script with an error message.
+
+ Args:
+ msg (str): The error message to be displayed.
+
+ Returns:
+ None
+ """
+ sys.exit(f"Execution aborted: {msg}\n")
+
+def logWarning(msg :str, loggerPath :str) -> None:
+ """
+ Log a warning message to an output log file and print it to the console. The final period and a
+ newline is added by the function.
+
+ Args:
+ s (str): The warning message to be logged and printed.
+ loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
+ immediately read back (beware relative expensive operation, log with caution).
+
+ Returns:
+ None
+ """
+ # building the path and then reading it immediately seems useless, but it's actually a way of
+ # validating that reduces repetition on the caller's side. Besides, logging a message by writing
+ # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
+ # mindlessly logging whenever something comes up, log at the very end and tell the user everything
+ # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
+ # the file only at the end of the program's execution.
+ with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
+
+class CustomErr(Exception):
+ """
+ Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
+ """
+ __idGenerator = count()
+ errName = "Custom Error"
+ def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
+ """
+ (Private) Initializes an instance of CustomErr.
+
+ Args:
+ msg (str): Error message to be displayed.
+ details (str): Informs the user more about the error encountered. Defaults to "".
+ explicitErrCode (int): Explicit error code to be used. Defaults to -1.
+
+ Returns:
+ None : practically, a CustomErr instance.
+ """
+ self.msg = msg
+ self.details = details
+
+ self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
+
+ def throw(self, loggerPath = "") -> None:
+ """
+ Raises the current CustomErr instance, logging a warning message before doing so.
+
+ Raises:
+ self: The current CustomErr instance.
+
+ Returns:
+ None
+ """
+ if loggerPath: logWarning(str(self), loggerPath)
+ raise self
+
+ def abort(self) -> None:
+ """
+ Aborts the execution of the script.
+
+ Returns:
+ None
+ """
+ terminate(str(self))
+
+ def __str__(self) -> str:
+ """
+ (Private) Returns a string representing the current CustomErr instance.
+
+ Returns:
+ str: A string representing the current CustomErr instance.
+ """
+ return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
+
+class ArgsErr(CustomErr):
+ """
+ CustomErr subclass for UI arguments errors.
+ """
+ errName = "Args Error"
+ def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
+ super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
+
+class DataErr(CustomErr):
+ """
+ CustomErr subclass for data formatting errors.
+ """
+ errName = "Data Format Error"
+ def __init__(self, fileName :str, msg = "no further details provided") -> None:
+ super().__init__(f"file \"{fileName}\" contains malformed data", msg)
+
+class PathErr(CustomErr):
+ """
+ CustomErr subclass for filepath formatting errors.
+ """
+ errName = "Path Error"
+ def __init__(self, path :FilePath, msg = "no further details provided") -> None:
+ super().__init__(f"path \"{path}\" is invalid", msg)
+
+class ValueErr(CustomErr):
+ """
+ CustomErr subclass for any value error.
+ """
+ errName = "Value Error"
+ def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
+ super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
+
+# RESULT
+T = TypeVar('T')
+E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
+class Result(Generic[T, E]):
+ class ResultErr(CustomErr):
+ """
+ CustomErr subclass for all Result errors.
+ """
+ errName = "Result Error"
+ def __init__(self, msg = "no further details provided") -> None:
+ super().__init__(msg)
+ """
+ Class to handle the result of an operation, with a value and a boolean flag to indicate
+ whether the operation was successful or not.
+ """
+ def __init__(self, value :Union[T, E], isOk :bool) -> None:
+ """
+ (Private) Initializes an instance of Result.
+
+ Args:
+ value (Union[T, E]): The value to be stored in the Result instance.
+ isOk (bool): A boolean flag to indicate whether the operation was successful or not.
+
+ Returns:
+ None : practically, a Result instance.
+ """
+ self.isOk = isOk
+ self.isErr = not isOk
+ self.value = value
+
+ @classmethod
+ def Ok(cls, value :T) -> "Result":
+ """
+ Constructs a new Result instance with a successful operation.
+
+ Args:
+ value (T): The value to be stored in the Result instance, set as successful.
+
+ Returns:
+ Result: A new Result instance with a successful operation.
+ """
+ return Result(value, isOk = True)
+
+ @classmethod
+ def Err(cls, value :E) -> "Result":
+ """
+ Constructs a new Result instance with a failed operation.
+
+ Args:
+ value (E): The value to be stored in the Result instance, set as failed.
+
+ Returns:
+ Result: A new Result instance with a failed operation.
+ """
+ return Result(value, isOk = False)
+
+ def unwrap(self) -> T:
+ """
+ Unwraps the value of the Result instance, if the operation was successful.
+
+ Raises:
+ ResultErr: If the operation was not successful.
+
+ Returns:
+ T: The value of the Result instance, if the operation was successful.
+ """
+ if self.isOk: return self.value
+ raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
+
+ def unwrapOr(self, default :T) -> T:
+ """
+ Unwraps the value of the Result instance, if the operation was successful, otherwise
+ it returns a default value.
+
+ Args:
+ default (T): The default value to be returned if the operation was not successful.
+
+ Returns:
+ T: The value of the Result instance, if the operation was successful,
+ otherwise the default value.
+ """
+ return self.value if self.isOk else default
+
+ def expect(self, err :"Result.ResultErr") -> T:
+ """
+ Expects that the value of the Result instance is successful, otherwise it raises an error.
+
+ Args:
+ err (Exception): The error to be raised if the operation was not successful.
+
+ Raises:
+ err: The error raised if the operation was not successful.
+
+ Returns:
+ T: The value of the Result instance, if the operation was successful.
+ """
+ if self.isOk: return self.value
+ raise err
+
+ U = TypeVar("U")
+ def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
+ """
+ Maps the value of the current Result to whatever is returned by the mapper function.
+ If the Result contained an unsuccessful operation to begin with it remains unchanged
+ (a reference to the current instance is returned).
+ If the mapper function panics the returned result instance will be of the error kind.
+
+ Args:
+ mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
+
+ Returns:
+ Result[U, E]: The result of the mapper operation applied to the Result value.
+ """
+ if self.isErr: return self
+ try: return Result.Ok(mapper(self.value))
+ except Exception as e: return Result.Err(e)
+
+ D = TypeVar("D", bound = "Result.ResultErr")
+ def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
+ """
+ Maps the error of the current Result to whatever is returned by the mapper function.
+ If the Result contained a successful operation it remains unchanged
+ (a reference to the current instance is returned).
+ If the mapper function panics this method does as well.
+
+ Args:
+ mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
+
+ Returns:
+ Result[U, E]: The result of the mapper operation applied to the Result error.
+ """
+ if self.isOk: return self
+ return Result.Err(mapper(self.value))
+
+ def __str__(self):
+ return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
+
+# FILES
+def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
+ """
+ Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
+
+ Args:
+ path : the path to the dataset file.
+ datasetName : the name of the dataset.
+
+ Raises:
+ DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
+ it has less than 2 columns.
+
+ Returns:
+ pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
+ """
+ # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
+ # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
+ # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
+ # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
+ # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
+ # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
+ try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
+ except:
+ try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
+ except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
+
+ if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
+ return dataset
+
+def readPickle(path :FilePath) -> Any:
+ """
+ Reads the contents of a .pickle file, which needs to exist at the given path.
+
+ Args:
+ path : the path to the .pickle file.
+
+ Returns:
+ Any : the data inside a pickle file, could be anything.
+ """
+ with open(path.show(), "rb") as fd: return pickle.load(fd)
+
+def writePickle(path :FilePath, data :Any) -> None:
+ """
+ Saves any data in a .pickle file, created at the given path.
+
+ Args:
+ path : the path to the .pickle file.
+ data : the data to be written to the file.
+
+ Returns:
+ None
+ """
+ with open(path.show(), "wb") as fd: pickle.dump(data, fd)
+
+def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
+ """
+ Reads the contents of a .csv file, which needs to exist at the given path.
+
+ Args:
+ path : the path to the .csv file.
+ delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
+ skipHeader : whether the first row of the file is a header and should be skipped.
+
+ Returns:
+ List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
+ """
+ with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
+
+def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
+ """
+ Reads the contents of a .svg file, which needs to exist at the given path.
+
+ Args:
+ path : the path to the .svg file.
+
+ Raises:
+ DataErr : if the map is malformed.
+
+ Returns:
+ Any : the data inside a svg file, could be anything.
+ """
+ try: return ET.parse(path.show())
+ except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
+ raise customErr if customErr else err
+
+def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
+ """
+ Saves svg data opened with lxml.etree in a .svg file, created at the given path.
+
+ Args:
+ path : the path to the .svg file.
+ data : the data to be written to the file.
+
+ Returns:
+ None
+ """
+ with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
+
+# UI ARGUMENTS
+class Bool:
+ def __init__(self, argName :str) -> None:
+ self.argName = argName
+
+ def __call__(self, s :str) -> bool: return self.check(s)
+
+ def check(self, s :str) -> bool:
+ s = s.lower()
+ if s == "true" : return True
+ if s == "false": return False
+ raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
+
+class Float:
+ def __init__(self, argName = "Dataset values, not an argument") -> None:
+ self.argName = argName
+
+ def __call__(self, s :str) -> float: return self.check(s)
+
+ def check(self, s :str) -> float:
+ try: return float(s)
+ except ValueError:
+ s = s.lower()
+ if s == "nan" or s == "none": return math.nan
+ raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
+
+# MODELS
+OldRule = List[Union[str, "OldRule"]]
+class Model(Enum):
+ """
+ Represents a metabolic model, either custom or locally supported. Custom models don't point
+ to valid file paths.
+ """
+
+ Recon = "Recon"
+ ENGRO2 = "ENGRO2"
+ ENGRO2_no_legend = "ENGRO2_no_legend"
+ HMRcore = "HMRcore"
+ HMRcore_no_legend = "HMRcore_no_legend"
+ Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
+
+ def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
+ if not path: raise PathErr("<>", "it's necessary to provide a custom path when retrieving files from a custom model")
+
+ def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
+ """
+ Open "rules" file for this model.
+
+ Returns:
+ Dict[str, Dict[str, OldRule]] : the rules for this model.
+ """
+ path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
+ self.__raiseMissingPathErr(path)
+ return readPickle(path)
+
+ def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
+ """
+ Open "gene translator (old: gene_in_rule)" file for this model.
+
+ Returns:
+ Dict[str, Dict[str, str]] : the translator dict for this model.
+ """
+ path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
+ self.__raiseMissingPathErr(path)
+ return readPickle(path)
+
+ def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
+ path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
+ self.__raiseMissingPathErr(path)
+ return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
+
+ def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
+ if(self is Model.Custom):
+ return self.load_custom_model(customPath, customExtension)
+ else:
+ return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
+
+ def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
+ ext = ext if ext else file_path.ext
+ try:
+ if str(ext) in FileFormat.XML.value:
+ return cobra.io.read_sbml_model(file_path.show())
+
+ if str(ext) in FileFormat.JSON.value:
+ # Compressed files are not automatically handled by cobra
+ if(ext == "json"):
+ return cobra.io.load_json_model(file_path.show())
+ else:
+ return self.extract_model(file_path, ext, "json")
+
+ if str(ext) in FileFormat.MAT.value:
+ # Compressed files are not automatically handled by cobra
+ if(ext == "mat"):
+ return cobra.io.load_matlab_model(file_path.show())
+ else:
+ return self.extract_model(file_path, ext, "mat")
+
+ if str(ext) in FileFormat.YML.value:
+ # Compressed files are not automatically handled by cobra
+ if(ext == "yml"):
+ return cobra.io.load_yaml_model(file_path.show())
+ else:
+ return self.extract_model(file_path, ext, "yml")
+
+ except Exception as e: raise DataErr(file_path, e.__str__())
+ raise DataErr(file_path,
+ f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
+
+
+ def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
+ """
+ Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
+
+ Args:
+ file_path: File path of the model
+ ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
+
+ Returns:
+ cobra.Model: COBRApy model
+
+ Raises:
+ Exception: Extraction errors
+ """
+ ext_str = str(ext)
+
+ try:
+ if '.zip' in ext_str:
+ with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
+ with zip_ref.open(zip_ref.namelist()[0]) as json_file:
+ content = json_file.read().decode('utf-8')
+ if model_encoding == "json":
+ return cobra.io.load_json_model(StringIO(content))
+ elif model_encoding == "mat":
+ return cobra.io.load_matlab_model(StringIO(content))
+ elif model_encoding == "yml":
+ return cobra.io.load_yaml_model(StringIO(content))
+ else:
+ raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
+ elif '.gz' in ext_str:
+ with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
+ if model_encoding == "json":
+ return cobra.io.load_json_model(gz_ref)
+ elif model_encoding == "mat":
+ return cobra.io.load_matlab_model(gz_ref)
+ elif model_encoding == "yml":
+ return cobra.io.load_yaml_model(gz_ref)
+ else:
+ raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
+ elif '.bz2' in ext_str:
+ with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
+ if model_encoding == "json":
+ return cobra.io.load_json_model(bz2_ref)
+ elif model_encoding == "mat":
+ return cobra.io.load_matlab_model(bz2_ref)
+ elif model_encoding == "yml":
+ return cobra.io.load_yaml_model(bz2_ref)
+ else:
+ raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
+ else:
+ raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
+
+ except Exception as e:
+ raise Exception(f"Error during model extraction: {str(e)}")
+
+
+
def __str__(self) -> str: return self.value
\ No newline at end of file