| 335 | 1 import math | 
|  | 2 import re | 
|  | 3 import sys | 
|  | 4 import csv | 
|  | 5 import pickle | 
|  | 6 import lxml.etree as ET | 
|  | 7 | 
|  | 8 from enum import Enum | 
|  | 9 from itertools import count | 
|  | 10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union | 
|  | 11 | 
|  | 12 import pandas as pd | 
|  | 13 import cobra | 
|  | 14 | 
|  | 15 import zipfile | 
|  | 16 import gzip | 
|  | 17 import bz2 | 
|  | 18 from io import StringIO | 
|  | 19 | 
| 339 | 20 class ValueErr(Exception): | 
|  | 21     def __init__(self, param_name, expected, actual): | 
|  | 22         super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") | 
|  | 23 | 
|  | 24 class PathErr(Exception): | 
|  | 25     def __init__(self, path, message): | 
|  | 26         super().__init__(f"Path error for '{path}': {message}") | 
|  | 27 | 
| 335 | 28 class FileFormat(Enum): | 
|  | 29     """ | 
|  | 30     Encodes possible file extensions to conditionally save data in a different format. | 
|  | 31     """ | 
|  | 32     DAT    = ("dat",) # this is how galaxy treats all your files! | 
|  | 33     CSV    = ("csv",) # this is how most editable input data is written | 
| 339 | 34     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | 
| 335 | 35     SVG    = ("svg",) # this is how most metabolic maps are written | 
|  | 36     PNG    = ("png",) # this is a common output format for images (such as metabolic maps) | 
|  | 37     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications. | 
| 339 | 38 | 
|  | 39     # Updated to include compressed variants | 
|  | 40     XML    = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed | 
|  | 41     JSON   = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed | 
|  | 42 | 
| 335 | 43     TXT = ("txt",) # this is how most output data is written | 
|  | 44     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | 
|  | 45 | 
| 339 | 46     def __init__(self, *extensions): | 
|  | 47         self.extensions = extensions | 
|  | 48         # Store original extension when set via fromExt | 
|  | 49         self._original_extension = None | 
|  | 50 | 
| 335 | 51     @classmethod | 
| 339 | 52     def fromExt(cls, ext: str) -> "FileFormat": | 
| 335 | 53         """ | 
|  | 54         Converts a file extension string to a FileFormat instance. | 
|  | 55         Args: | 
|  | 56             ext : The file extension as a string. | 
|  | 57         Returns: | 
|  | 58             FileFormat: The FileFormat instance corresponding to the file extension. | 
|  | 59         """ | 
|  | 60         variantName = ext.upper() | 
|  | 61         if variantName in FileFormat.__members__: | 
|  | 62             instance = FileFormat[variantName] | 
| 339 | 63             instance._original_extension = ext | 
| 335 | 64             return instance | 
|  | 65 | 
| 339 | 66         variantName = ext.lower() | 
| 335 | 67         for member in cls: | 
|  | 68             if variantName in member.value: | 
| 339 | 69                 # Create a copy-like behavior by storing the original extension | 
|  | 70                 member._original_extension = ext | 
| 335 | 71                 return member | 
|  | 72 | 
|  | 73         raise ValueErr("ext", "a valid FileFormat file extension", ext) | 
|  | 74 | 
|  | 75     def __str__(self) -> str: | 
|  | 76         """ | 
|  | 77         (Private) converts to str representation. Good practice for usage with argparse. | 
|  | 78         Returns: | 
|  | 79             str : the string representation of the file extension. | 
|  | 80         """ | 
| 339 | 81         # If we have an original extension stored (for compressed files), use it | 
|  | 82         if hasattr(self, '_original_extension') and self._original_extension: | 
|  | 83             return self._original_extension | 
|  | 84 | 
|  | 85         # TODO: fix, it's the dumb pickle thing keep this behaviour if we are not dealing with XML or JSON | 
|  | 86         return self.value[-1] | 
| 335 | 87 | 
|  | 88 class FilePath(): | 
|  | 89     """ | 
|  | 90     Represents a file path. View this as an attempt to standardize file-related operations by expecting | 
|  | 91     values of this type in any process requesting a file path. | 
|  | 92     """ | 
| 339 | 93     def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: | 
| 335 | 94         """ | 
|  | 95         (Private) Initializes an instance of FilePath. | 
|  | 96         Args: | 
|  | 97             path : the end of the path, containing the file name. | 
|  | 98             ext : the file's extension. | 
|  | 99             prefix : anything before path, if the last '/' isn't there it's added by the code. | 
|  | 100         Returns: | 
|  | 101             None : practically, a FilePath instance. | 
|  | 102         """ | 
| 339 | 103         self.ext = ext | 
| 335 | 104         self.filePath = filePath | 
|  | 105 | 
| 339 | 106         if prefix and prefix[-1] != '/': | 
|  | 107             prefix += '/' | 
| 335 | 108         self.prefix = prefix | 
|  | 109 | 
|  | 110     @classmethod | 
| 339 | 111     def fromStrPath(cls, path: str) -> "FilePath": | 
| 335 | 112         """ | 
|  | 113         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | 
|  | 114         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. | 
|  | 115         These double extensions are not supported for other file types such as .csv. | 
|  | 116         Args: | 
|  | 117             path : the string containing the path | 
|  | 118         Raises: | 
|  | 119             PathErr : if the provided string doesn't represent a valid path. | 
|  | 120         Returns: | 
|  | 121             FilePath : the constructed instance. | 
|  | 122         """ | 
|  | 123         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | 
|  | 124         if not result or not result["name"] or not result["ext"]: | 
|  | 125             raise PathErr(path, "cannot recognize folder structure or extension in path") | 
|  | 126 | 
|  | 127         prefix = result["prefix"] if result["prefix"] else "" | 
|  | 128         name, ext = result["name"], result["ext"] | 
|  | 129 | 
| 339 | 130         # Check for double extensions (json.gz, xml.zip, etc.) | 
| 335 | 131         parts = path.split(".") | 
|  | 132         if len(parts) >= 3: | 
|  | 133             penultimate = parts[-2] | 
|  | 134             last = parts[-1] | 
| 339 | 135             double_ext = f"{penultimate}.{last}" | 
|  | 136 | 
|  | 137             # Try the double extension first | 
|  | 138             try: | 
|  | 139                 ext_format = FileFormat.fromExt(double_ext) | 
| 335 | 140                 name = ".".join(parts[:-2]) | 
| 339 | 141                 # Extract prefix if it exists | 
|  | 142                 if '/' in name: | 
|  | 143                     prefix = name[:name.rfind('/') + 1] | 
|  | 144                     name = name[name.rfind('/') + 1:] | 
|  | 145                 return cls(name, ext_format, prefix=prefix) | 
|  | 146             except ValueErr: | 
|  | 147                 # If double extension doesn't work, fall back to single extension | 
|  | 148                 pass | 
| 335 | 149 | 
| 339 | 150         # Single extension fallback (original logic) | 
|  | 151         try: | 
|  | 152             ext_format = FileFormat.fromExt(ext) | 
|  | 153             return cls(name, ext_format, prefix=prefix) | 
|  | 154         except ValueErr: | 
|  | 155             raise PathErr(path, f"unsupported file extension: {ext}") | 
| 335 | 156 | 
|  | 157     def show(self) -> str: | 
|  | 158         """ | 
|  | 159         Shows the path as a string. | 
|  | 160         Returns: | 
|  | 161             str : the path shown as a string. | 
|  | 162         """ | 
|  | 163         return f"{self.prefix}{self.filePath}.{self.ext}" | 
|  | 164 | 
| 339 | 165     def __str__(self) -> str: | 
|  | 166         return self.show() | 
| 335 | 167 | 
|  | 168 # ERRORS | 
|  | 169 def terminate(msg :str) -> None: | 
|  | 170     """ | 
|  | 171     Terminate the execution of the script with an error message. | 
|  | 172 | 
|  | 173     Args: | 
|  | 174         msg (str): The error message to be displayed. | 
|  | 175 | 
|  | 176     Returns: | 
|  | 177         None | 
|  | 178     """ | 
|  | 179     sys.exit(f"Execution aborted: {msg}\n") | 
|  | 180 | 
|  | 181 def logWarning(msg :str, loggerPath :str) -> None: | 
|  | 182     """ | 
|  | 183     Log a warning message to an output log file and print it to the console. The final period and a | 
|  | 184     newline is added by the function. | 
|  | 185 | 
|  | 186     Args: | 
|  | 187         s (str): The warning message to be logged and printed. | 
|  | 188         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | 
|  | 189         immediately read back (beware relative expensive operation, log with caution). | 
|  | 190 | 
|  | 191     Returns: | 
|  | 192         None | 
|  | 193     """ | 
|  | 194     # building the path and then reading it immediately seems useless, but it's actually a way of | 
|  | 195     # validating that reduces repetition on the caller's side. Besides, logging a message by writing | 
|  | 196     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | 
|  | 197     # mindlessly logging whenever something comes up, log at the very end and tell the user everything | 
|  | 198     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | 
|  | 199     # the file only at the end of the program's execution. | 
|  | 200     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | 
|  | 201 | 
|  | 202 class CustomErr(Exception): | 
|  | 203     """ | 
|  | 204     Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | 
|  | 205     """ | 
|  | 206     __idGenerator = count() | 
|  | 207     errName = "Custom Error" | 
|  | 208     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | 
|  | 209         """ | 
|  | 210         (Private) Initializes an instance of CustomErr. | 
|  | 211 | 
|  | 212         Args: | 
|  | 213             msg (str): Error message to be displayed. | 
|  | 214             details (str): Informs the user more about the error encountered. Defaults to "". | 
|  | 215             explicitErrCode (int): Explicit error code to be used. Defaults to -1. | 
|  | 216 | 
|  | 217         Returns: | 
|  | 218             None : practically, a CustomErr instance. | 
|  | 219         """ | 
|  | 220         self.msg     = msg | 
|  | 221         self.details = details | 
|  | 222 | 
|  | 223         self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | 
|  | 224 | 
|  | 225     def throw(self, loggerPath = "") -> None: | 
|  | 226         """ | 
|  | 227         Raises the current CustomErr instance, logging a warning message before doing so. | 
|  | 228 | 
|  | 229         Raises: | 
|  | 230             self: The current CustomErr instance. | 
|  | 231 | 
|  | 232         Returns: | 
|  | 233             None | 
|  | 234         """ | 
|  | 235         if loggerPath: logWarning(str(self), loggerPath) | 
|  | 236         raise self | 
|  | 237 | 
|  | 238     def abort(self) -> None: | 
|  | 239         """ | 
|  | 240         Aborts the execution of the script. | 
|  | 241 | 
|  | 242         Returns: | 
|  | 243             None | 
|  | 244         """ | 
|  | 245         terminate(str(self)) | 
|  | 246 | 
|  | 247     def __str__(self) -> str: | 
|  | 248         """ | 
|  | 249         (Private) Returns a string representing the current CustomErr instance. | 
|  | 250 | 
|  | 251         Returns: | 
|  | 252             str: A string representing the current CustomErr instance. | 
|  | 253         """ | 
|  | 254         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | 
|  | 255 | 
|  | 256 class ArgsErr(CustomErr): | 
|  | 257     """ | 
|  | 258     CustomErr subclass for UI arguments errors. | 
|  | 259     """ | 
|  | 260     errName = "Args Error" | 
|  | 261     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 262         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | 
|  | 263 | 
|  | 264 class DataErr(CustomErr): | 
|  | 265     """ | 
|  | 266     CustomErr subclass for data formatting errors. | 
|  | 267     """ | 
|  | 268     errName = "Data Format Error" | 
|  | 269     def __init__(self, fileName :str, msg = "no further details provided") -> None: | 
|  | 270         super().__init__(f"file \"{fileName}\" contains malformed data", msg) | 
|  | 271 | 
|  | 272 class PathErr(CustomErr): | 
|  | 273     """ | 
|  | 274     CustomErr subclass for filepath formatting errors. | 
|  | 275     """ | 
|  | 276     errName = "Path Error" | 
|  | 277     def __init__(self, path :FilePath, msg = "no further details provided") -> None: | 
|  | 278         super().__init__(f"path \"{path}\" is invalid", msg) | 
|  | 279 | 
|  | 280 class ValueErr(CustomErr): | 
|  | 281     """ | 
|  | 282     CustomErr subclass for any value error. | 
|  | 283     """ | 
|  | 284     errName = "Value Error" | 
|  | 285     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 286         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | 
|  | 287 | 
|  | 288 # RESULT | 
|  | 289 T = TypeVar('T') | 
|  | 290 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | 
|  | 291 class Result(Generic[T, E]): | 
|  | 292     class ResultErr(CustomErr): | 
|  | 293         """ | 
|  | 294         CustomErr subclass for all Result errors. | 
|  | 295         """ | 
|  | 296         errName = "Result Error" | 
|  | 297         def __init__(self, msg = "no further details provided") -> None: | 
|  | 298             super().__init__(msg) | 
|  | 299     """ | 
|  | 300     Class to handle the result of an operation, with a value and a boolean flag to indicate | 
|  | 301     whether the operation was successful or not. | 
|  | 302     """ | 
|  | 303     def __init__(self, value :Union[T, E], isOk :bool) -> None: | 
|  | 304         """ | 
|  | 305         (Private) Initializes an instance of Result. | 
|  | 306 | 
|  | 307         Args: | 
|  | 308             value (Union[T, E]): The value to be stored in the Result instance. | 
|  | 309             isOk (bool): A boolean flag to indicate whether the operation was successful or not. | 
|  | 310 | 
|  | 311             Returns: | 
|  | 312                 None : practically, a Result instance. | 
|  | 313         """ | 
|  | 314         self.isOk  = isOk | 
|  | 315         self.isErr = not isOk | 
|  | 316         self.value = value | 
|  | 317 | 
|  | 318     @classmethod | 
|  | 319     def Ok(cls,  value :T) -> "Result": | 
|  | 320         """ | 
|  | 321         Constructs a new Result instance with a successful operation. | 
|  | 322 | 
|  | 323         Args: | 
|  | 324             value (T): The value to be stored in the Result instance, set as successful. | 
|  | 325 | 
|  | 326         Returns: | 
|  | 327             Result: A new Result instance with a successful operation. | 
|  | 328         """ | 
|  | 329         return Result(value, isOk = True) | 
|  | 330 | 
|  | 331     @classmethod | 
|  | 332     def Err(cls, value :E) -> "Result": | 
|  | 333         """ | 
|  | 334         Constructs a new Result instance with a failed operation. | 
|  | 335 | 
|  | 336         Args: | 
|  | 337             value (E): The value to be stored in the Result instance, set as failed. | 
|  | 338 | 
|  | 339         Returns: | 
|  | 340             Result: A new Result instance with a failed operation. | 
|  | 341         """ | 
|  | 342         return Result(value, isOk = False) | 
|  | 343 | 
|  | 344     def unwrap(self) -> T: | 
|  | 345         """ | 
|  | 346         Unwraps the value of the Result instance, if the operation was successful. | 
|  | 347 | 
|  | 348         Raises: | 
|  | 349             ResultErr: If the operation was not successful. | 
|  | 350 | 
|  | 351         Returns: | 
|  | 352             T: The value of the Result instance, if the operation was successful. | 
|  | 353         """ | 
|  | 354         if self.isOk: return self.value | 
|  | 355         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | 
|  | 356 | 
|  | 357     def unwrapOr(self, default :T) -> T: | 
|  | 358         """ | 
|  | 359         Unwraps the value of the Result instance, if the operation was successful, otherwise | 
|  | 360         it returns a default value. | 
|  | 361 | 
|  | 362         Args: | 
|  | 363             default (T): The default value to be returned if the operation was not successful. | 
|  | 364 | 
|  | 365         Returns: | 
|  | 366             T: The value of the Result instance, if the operation was successful, | 
|  | 367             otherwise the default value. | 
|  | 368         """ | 
|  | 369         return self.value if self.isOk else default | 
|  | 370 | 
|  | 371     def expect(self, err :"Result.ResultErr") -> T: | 
|  | 372         """ | 
|  | 373         Expects that the value of the Result instance is successful, otherwise it raises an error. | 
|  | 374 | 
|  | 375         Args: | 
|  | 376             err (Exception): The error to be raised if the operation was not successful. | 
|  | 377 | 
|  | 378         Raises: | 
|  | 379             err: The error raised if the operation was not successful. | 
|  | 380 | 
|  | 381         Returns: | 
|  | 382             T: The value of the Result instance, if the operation was successful. | 
|  | 383         """ | 
|  | 384         if self.isOk: return self.value | 
|  | 385         raise err | 
|  | 386 | 
|  | 387     U = TypeVar("U") | 
|  | 388     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | 
|  | 389         """ | 
|  | 390         Maps the value of the current Result to whatever is returned by the mapper function. | 
|  | 391         If the Result contained an unsuccessful operation to begin with it remains unchanged | 
|  | 392         (a reference to the current instance is returned). | 
|  | 393         If the mapper function panics the returned result instance will be of the error kind. | 
|  | 394 | 
|  | 395         Args: | 
|  | 396             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | 
|  | 397 | 
|  | 398         Returns: | 
|  | 399             Result[U, E]: The result of the mapper operation applied to the Result value. | 
|  | 400         """ | 
|  | 401         if self.isErr: return self | 
|  | 402         try: return Result.Ok(mapper(self.value)) | 
|  | 403         except Exception as e: return Result.Err(e) | 
|  | 404 | 
|  | 405     D = TypeVar("D", bound = "Result.ResultErr") | 
|  | 406     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | 
|  | 407         """ | 
|  | 408         Maps the error of the current Result to whatever is returned by the mapper function. | 
|  | 409         If the Result contained a successful operation it remains unchanged | 
|  | 410         (a reference to the current instance is returned). | 
|  | 411         If the mapper function panics this method does as well. | 
|  | 412 | 
|  | 413         Args: | 
|  | 414             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | 
|  | 415 | 
|  | 416         Returns: | 
|  | 417             Result[U, E]: The result of the mapper operation applied to the Result error. | 
|  | 418         """ | 
|  | 419         if self.isOk: return self | 
|  | 420         return Result.Err(mapper(self.value)) | 
|  | 421 | 
|  | 422     def __str__(self): | 
|  | 423         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | 
|  | 424 | 
|  | 425 # FILES | 
|  | 426 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | 
|  | 427     """ | 
|  | 428     Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | 
|  | 429 | 
|  | 430     Args: | 
|  | 431         path : the path to the dataset file. | 
|  | 432         datasetName : the name of the dataset. | 
|  | 433 | 
|  | 434     Raises: | 
|  | 435         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | 
|  | 436         it has less than 2 columns. | 
|  | 437 | 
|  | 438     Returns: | 
|  | 439         pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | 
|  | 440     """ | 
|  | 441     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | 
|  | 442     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | 
|  | 443     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | 
|  | 444     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | 
|  | 445     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | 
|  | 446     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | 
|  | 447     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | 
|  | 448     except: | 
|  | 449         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | 
|  | 450         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | 
|  | 451 | 
|  | 452     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | 
|  | 453     return dataset | 
|  | 454 | 
|  | 455 def readPickle(path :FilePath) -> Any: | 
|  | 456     """ | 
|  | 457     Reads the contents of a .pickle file, which needs to exist at the given path. | 
|  | 458 | 
|  | 459     Args: | 
|  | 460         path : the path to the .pickle file. | 
|  | 461 | 
|  | 462     Returns: | 
|  | 463         Any : the data inside a pickle file, could be anything. | 
|  | 464     """ | 
|  | 465     with open(path.show(), "rb") as fd: return pickle.load(fd) | 
|  | 466 | 
|  | 467 def writePickle(path :FilePath, data :Any) -> None: | 
|  | 468     """ | 
|  | 469     Saves any data in a .pickle file, created at the given path. | 
|  | 470 | 
|  | 471     Args: | 
|  | 472         path : the path to the .pickle file. | 
|  | 473         data : the data to be written to the file. | 
|  | 474 | 
|  | 475     Returns: | 
|  | 476         None | 
|  | 477     """ | 
|  | 478     with open(path.show(), "wb") as fd: pickle.dump(data, fd) | 
|  | 479 | 
|  | 480 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | 
|  | 481     """ | 
|  | 482     Reads the contents of a .csv file, which needs to exist at the given path. | 
|  | 483 | 
|  | 484     Args: | 
|  | 485         path : the path to the .csv file. | 
|  | 486         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | 
|  | 487         skipHeader : whether the first row of the file is a header and should be skipped. | 
|  | 488 | 
|  | 489     Returns: | 
|  | 490         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | 
|  | 491     """ | 
|  | 492     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | 
|  | 493 | 
|  | 494 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | 
|  | 495     """ | 
|  | 496     Reads the contents of a .svg file, which needs to exist at the given path. | 
|  | 497 | 
|  | 498     Args: | 
|  | 499         path : the path to the .svg file. | 
|  | 500 | 
|  | 501     Raises: | 
|  | 502         DataErr : if the map is malformed. | 
|  | 503 | 
|  | 504     Returns: | 
|  | 505         Any : the data inside a svg file, could be anything. | 
|  | 506     """ | 
|  | 507     try: return ET.parse(path.show()) | 
|  | 508     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | 
|  | 509         raise customErr if customErr else err | 
|  | 510 | 
|  | 511 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | 
|  | 512     """ | 
|  | 513     Saves svg data opened with lxml.etree in a .svg file, created at the given path. | 
|  | 514 | 
|  | 515     Args: | 
|  | 516         path : the path to the .svg file. | 
|  | 517         data : the data to be written to the file. | 
|  | 518 | 
|  | 519     Returns: | 
|  | 520         None | 
|  | 521     """ | 
|  | 522     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | 
|  | 523 | 
|  | 524 # UI ARGUMENTS | 
|  | 525 class Bool: | 
|  | 526     def __init__(self, argName :str) -> None: | 
|  | 527         self.argName = argName | 
|  | 528 | 
|  | 529     def __call__(self, s :str) -> bool: return self.check(s) | 
|  | 530 | 
|  | 531     def check(self, s :str) -> bool: | 
|  | 532         s = s.lower() | 
|  | 533         if s == "true" : return True | 
|  | 534         if s == "false": return False | 
|  | 535         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | 
|  | 536 | 
|  | 537 class Float: | 
|  | 538     def __init__(self, argName = "Dataset values, not an argument") -> None: | 
|  | 539         self.argName = argName | 
|  | 540 | 
|  | 541     def __call__(self, s :str) -> float: return self.check(s) | 
|  | 542 | 
|  | 543     def check(self, s :str) -> float: | 
|  | 544         try: return float(s) | 
|  | 545         except ValueError: | 
|  | 546             s = s.lower() | 
|  | 547             if s == "nan" or s == "none": return math.nan | 
|  | 548             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | 
|  | 549 | 
|  | 550 # MODELS | 
|  | 551 OldRule = List[Union[str, "OldRule"]] | 
|  | 552 class Model(Enum): | 
|  | 553     """ | 
|  | 554     Represents a metabolic model, either custom or locally supported. Custom models don't point | 
|  | 555     to valid file paths. | 
|  | 556     """ | 
|  | 557 | 
|  | 558     Recon   = "Recon" | 
|  | 559     ENGRO2  = "ENGRO2" | 
|  | 560     ENGRO2_no_legend = "ENGRO2_no_legend" | 
|  | 561     HMRcore = "HMRcore" | 
|  | 562     HMRcore_no_legend = "HMRcore_no_legend" | 
|  | 563     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | 
|  | 564 | 
|  | 565     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | 
|  | 566         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | 
|  | 567 | 
|  | 568     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | 
|  | 569         """ | 
|  | 570         Open "rules" file for this model. | 
|  | 571 | 
|  | 572         Returns: | 
|  | 573             Dict[str, Dict[str, OldRule]] : the rules for this model. | 
|  | 574         """ | 
|  | 575         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 576         self.__raiseMissingPathErr(path) | 
|  | 577         return readPickle(path) | 
|  | 578 | 
|  | 579     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | 
|  | 580         """ | 
|  | 581         Open "gene translator (old: gene_in_rule)" file for this model. | 
|  | 582 | 
|  | 583         Returns: | 
|  | 584             Dict[str, Dict[str, str]] : the translator dict for this model. | 
|  | 585         """ | 
|  | 586         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 587         self.__raiseMissingPathErr(path) | 
|  | 588         return readPickle(path) | 
|  | 589 | 
|  | 590     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | 
|  | 591         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | 
|  | 592         self.__raiseMissingPathErr(path) | 
|  | 593         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | 
|  | 594 | 
|  | 595     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | 
|  | 596         if(self is Model.Custom): | 
|  | 597             return self.load_custom_model(customPath, customExtension) | 
|  | 598         else: | 
|  | 599             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | 
|  | 600 | 
|  | 601     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | 
|  | 602         ext = ext if ext else file_path.ext | 
|  | 603         try: | 
|  | 604             if ext in FileFormat.XML: | 
|  | 605                 return cobra.io.read_sbml_model(file_path.show()) | 
|  | 606 | 
|  | 607             if ext in FileFormat.JSON: | 
|  | 608                 # Compressed files are not automatically handled by cobra | 
|  | 609                 if(ext == "json"): | 
|  | 610                     return cobra.io.load_json_model(file_path.show()) | 
|  | 611                 else: | 
|  | 612                     return self.extract_json_model(file_path, ext) | 
|  | 613 | 
|  | 614         except Exception as e: raise DataErr(file_path, e.__str__()) | 
|  | 615         raise DataErr(file_path, | 
|  | 616             f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") | 
|  | 617 | 
|  | 618 | 
|  | 619     def extract_json_model(file_path:FilePath, ext :FileFormat) -> cobra.Model: | 
|  | 620         """ | 
|  | 621         Extract json COBRA model from a compressed file (zip, gz, bz2). | 
|  | 622 | 
|  | 623         Args: | 
|  | 624             file_path: File path of the model | 
|  | 625             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) | 
|  | 626 | 
|  | 627         Returns: | 
|  | 628             cobra.Model: COBRApy model | 
|  | 629 | 
|  | 630         Raises: | 
|  | 631             Exception: Extraction errors | 
|  | 632         """ | 
|  | 633         ext_str = str(ext) | 
|  | 634 | 
|  | 635         try: | 
|  | 636             if '.zip' in ext_str: | 
|  | 637                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: | 
|  | 638                     with zip_ref.open(zip_ref.namelist()[0]) as json_file: | 
|  | 639                         content = json_file.read().decode('utf-8') | 
|  | 640                         return cobra.io.load_json_model(StringIO(content)) | 
|  | 641             elif '.gz' in ext_str: | 
|  | 642                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: | 
|  | 643                     return cobra.io.load_json_model(gz_ref) | 
|  | 644             elif '.bz2' in ext_str: | 
|  | 645                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: | 
|  | 646                     return cobra.io.load_json_model(bz2_ref) | 
|  | 647             else: | 
|  | 648                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") | 
|  | 649 | 
|  | 650         except Exception as e: | 
|  | 651             raise Exception(f"Error during model extraction: {str(e)}") | 
|  | 652 | 
|  | 653 | 
|  | 654 | 
| 240 | 655     def __str__(self) -> str: return self.value |