| 392 | 1 import math | 
|  | 2 import re | 
|  | 3 import sys | 
|  | 4 import csv | 
|  | 5 import pickle | 
|  | 6 import lxml.etree as ET | 
|  | 7 | 
|  | 8 from enum import Enum | 
|  | 9 from itertools import count | 
| 408 | 10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple | 
| 392 | 11 | 
|  | 12 import pandas as pd | 
|  | 13 import cobra | 
| 409 | 14 from cobra import Model as cobraModel, Reaction, Metabolite | 
| 392 | 15 | 
|  | 16 import zipfile | 
|  | 17 import gzip | 
|  | 18 import bz2 | 
|  | 19 from io import StringIO | 
| 411 | 20 import rule_parsing  as rulesUtils | 
|  | 21 import reaction_parsing as reactionUtils | 
| 392 | 22 | 
| 394 | 23 | 
|  | 24 | 
| 392 | 25 class ValueErr(Exception): | 
|  | 26     def __init__(self, param_name, expected, actual): | 
|  | 27         super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") | 
|  | 28 | 
|  | 29 class PathErr(Exception): | 
|  | 30     def __init__(self, path, message): | 
|  | 31         super().__init__(f"Path error for '{path}': {message}") | 
|  | 32 | 
|  | 33 class FileFormat(Enum): | 
|  | 34     """ | 
|  | 35     Encodes possible file extensions to conditionally save data in a different format. | 
|  | 36     """ | 
|  | 37     DAT    = ("dat",) # this is how galaxy treats all your files! | 
|  | 38     CSV    = ("csv",) # this is how most editable input data is written | 
|  | 39     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | 
|  | 40     SVG    = ("svg",) # this is how most metabolic maps are written | 
|  | 41     PNG    = ("png",) # this is a common output format for images (such as metabolic maps) | 
|  | 42     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications. | 
|  | 43 | 
|  | 44     # Updated to include compressed variants | 
|  | 45     XML    = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed | 
|  | 46     JSON   = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed | 
|  | 47     MAT    = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed | 
|  | 48     YML    = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed | 
|  | 49 | 
|  | 50     TXT    = ("txt",) # this is how most output data is written | 
|  | 51     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | 
|  | 52 | 
|  | 53     def __init__(self, *extensions): | 
|  | 54         self.extensions = extensions | 
|  | 55         # Store original extension when set via fromExt | 
|  | 56         self._original_extension = None | 
|  | 57 | 
|  | 58     @classmethod | 
|  | 59     def fromExt(cls, ext: str) -> "FileFormat": | 
|  | 60         """ | 
|  | 61         Converts a file extension string to a FileFormat instance. | 
|  | 62         Args: | 
|  | 63             ext : The file extension as a string. | 
|  | 64         Returns: | 
|  | 65             FileFormat: The FileFormat instance corresponding to the file extension. | 
|  | 66         """ | 
|  | 67         variantName = ext.upper() | 
|  | 68         if variantName in FileFormat.__members__: | 
|  | 69             instance = FileFormat[variantName] | 
|  | 70             instance._original_extension = ext | 
|  | 71             return instance | 
|  | 72 | 
|  | 73         variantName = ext.lower() | 
|  | 74         for member in cls: | 
|  | 75             if variantName in member.value: | 
|  | 76                 # Create a copy-like behavior by storing the original extension | 
|  | 77                 member._original_extension = ext | 
|  | 78                 return member | 
|  | 79 | 
|  | 80         raise ValueErr("ext", "a valid FileFormat file extension", ext) | 
|  | 81 | 
|  | 82     def __str__(self) -> str: | 
|  | 83         """ | 
|  | 84         (Private) converts to str representation. Good practice for usage with argparse. | 
|  | 85         Returns: | 
|  | 86             str : the string representation of the file extension. | 
|  | 87         """ | 
|  | 88         # If we have an original extension stored (for compressed files only), use it | 
|  | 89         if hasattr(self, '_original_extension') and self._original_extension: | 
|  | 90             return self._original_extension | 
|  | 91 | 
|  | 92         # For XML, JSON, MAT and YML without original extension, use the base extension | 
|  | 93         if self == FileFormat.XML: | 
|  | 94             return "xml" | 
|  | 95         elif self == FileFormat.JSON: | 
|  | 96             return "json" | 
|  | 97         elif self == FileFormat.MAT: | 
|  | 98             return "mat" | 
|  | 99         elif self == FileFormat.YML: | 
|  | 100             return "yml" | 
|  | 101 | 
|  | 102         return self.value[-1] | 
|  | 103 | 
|  | 104 class FilePath(): | 
|  | 105     """ | 
|  | 106     Represents a file path. View this as an attempt to standardize file-related operations by expecting | 
|  | 107     values of this type in any process requesting a file path. | 
|  | 108     """ | 
|  | 109     def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: | 
|  | 110         """ | 
|  | 111         (Private) Initializes an instance of FilePath. | 
|  | 112         Args: | 
|  | 113             path : the end of the path, containing the file name. | 
|  | 114             ext : the file's extension. | 
|  | 115             prefix : anything before path, if the last '/' isn't there it's added by the code. | 
|  | 116         Returns: | 
|  | 117             None : practically, a FilePath instance. | 
|  | 118         """ | 
|  | 119         self.ext = ext | 
|  | 120         self.filePath = filePath | 
|  | 121 | 
|  | 122         if prefix and prefix[-1] != '/': | 
|  | 123             prefix += '/' | 
|  | 124         self.prefix = prefix | 
|  | 125 | 
|  | 126     @classmethod | 
|  | 127     def fromStrPath(cls, path: str) -> "FilePath": | 
|  | 128         """ | 
|  | 129         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | 
|  | 130         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. | 
|  | 131         These double extensions are not supported for other file types such as .csv. | 
|  | 132         Args: | 
|  | 133             path : the string containing the path | 
|  | 134         Raises: | 
|  | 135             PathErr : if the provided string doesn't represent a valid path. | 
|  | 136         Returns: | 
|  | 137             FilePath : the constructed instance. | 
|  | 138         """ | 
|  | 139         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | 
|  | 140         if not result or not result["name"] or not result["ext"]: | 
|  | 141             raise PathErr(path, "cannot recognize folder structure or extension in path") | 
|  | 142 | 
|  | 143         prefix = result["prefix"] if result["prefix"] else "" | 
|  | 144         name, ext = result["name"], result["ext"] | 
|  | 145 | 
|  | 146         # Check for double extensions (json.gz, xml.zip, etc.) | 
|  | 147         parts = path.split(".") | 
|  | 148         if len(parts) >= 3: | 
|  | 149             penultimate = parts[-2] | 
|  | 150             last = parts[-1] | 
|  | 151             double_ext = f"{penultimate}.{last}" | 
|  | 152 | 
|  | 153             # Try the double extension first | 
|  | 154             try: | 
|  | 155                 ext_format = FileFormat.fromExt(double_ext) | 
|  | 156                 name = ".".join(parts[:-2]) | 
|  | 157                 # Extract prefix if it exists | 
|  | 158                 if '/' in name: | 
|  | 159                     prefix = name[:name.rfind('/') + 1] | 
|  | 160                     name = name[name.rfind('/') + 1:] | 
|  | 161                 return cls(name, ext_format, prefix=prefix) | 
|  | 162             except ValueErr: | 
|  | 163                 # If double extension doesn't work, fall back to single extension | 
|  | 164                 pass | 
|  | 165 | 
|  | 166         # Single extension fallback (original logic) | 
|  | 167         try: | 
|  | 168             ext_format = FileFormat.fromExt(ext) | 
|  | 169             return cls(name, ext_format, prefix=prefix) | 
|  | 170         except ValueErr: | 
|  | 171             raise PathErr(path, f"unsupported file extension: {ext}") | 
|  | 172 | 
|  | 173     def show(self) -> str: | 
|  | 174         """ | 
|  | 175         Shows the path as a string. | 
|  | 176         Returns: | 
|  | 177             str : the path shown as a string. | 
|  | 178         """ | 
|  | 179         return f"{self.prefix}{self.filePath}.{self.ext}" | 
|  | 180 | 
|  | 181     def __str__(self) -> str: | 
|  | 182         return self.show() | 
|  | 183 | 
|  | 184 # ERRORS | 
|  | 185 def terminate(msg :str) -> None: | 
|  | 186     """ | 
|  | 187     Terminate the execution of the script with an error message. | 
|  | 188 | 
|  | 189     Args: | 
|  | 190         msg (str): The error message to be displayed. | 
|  | 191 | 
|  | 192     Returns: | 
|  | 193         None | 
|  | 194     """ | 
|  | 195     sys.exit(f"Execution aborted: {msg}\n") | 
|  | 196 | 
|  | 197 def logWarning(msg :str, loggerPath :str) -> None: | 
|  | 198     """ | 
|  | 199     Log a warning message to an output log file and print it to the console. The final period and a | 
|  | 200     newline is added by the function. | 
|  | 201 | 
|  | 202     Args: | 
|  | 203         s (str): The warning message to be logged and printed. | 
|  | 204         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | 
|  | 205         immediately read back (beware relative expensive operation, log with caution). | 
|  | 206 | 
|  | 207     Returns: | 
|  | 208         None | 
|  | 209     """ | 
|  | 210     # building the path and then reading it immediately seems useless, but it's actually a way of | 
|  | 211     # validating that reduces repetition on the caller's side. Besides, logging a message by writing | 
|  | 212     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | 
|  | 213     # mindlessly logging whenever something comes up, log at the very end and tell the user everything | 
|  | 214     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | 
|  | 215     # the file only at the end of the program's execution. | 
|  | 216     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | 
|  | 217 | 
|  | 218 class CustomErr(Exception): | 
|  | 219     """ | 
|  | 220     Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | 
|  | 221     """ | 
|  | 222     __idGenerator = count() | 
|  | 223     errName = "Custom Error" | 
|  | 224     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | 
|  | 225         """ | 
|  | 226         (Private) Initializes an instance of CustomErr. | 
|  | 227 | 
|  | 228         Args: | 
|  | 229             msg (str): Error message to be displayed. | 
|  | 230             details (str): Informs the user more about the error encountered. Defaults to "". | 
|  | 231             explicitErrCode (int): Explicit error code to be used. Defaults to -1. | 
|  | 232 | 
|  | 233         Returns: | 
|  | 234             None : practically, a CustomErr instance. | 
|  | 235         """ | 
|  | 236         self.msg     = msg | 
|  | 237         self.details = details | 
|  | 238 | 
|  | 239         self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | 
|  | 240 | 
|  | 241     def throw(self, loggerPath = "") -> None: | 
|  | 242         """ | 
|  | 243         Raises the current CustomErr instance, logging a warning message before doing so. | 
|  | 244 | 
|  | 245         Raises: | 
|  | 246             self: The current CustomErr instance. | 
|  | 247 | 
|  | 248         Returns: | 
|  | 249             None | 
|  | 250         """ | 
|  | 251         if loggerPath: logWarning(str(self), loggerPath) | 
|  | 252         raise self | 
|  | 253 | 
|  | 254     def abort(self) -> None: | 
|  | 255         """ | 
|  | 256         Aborts the execution of the script. | 
|  | 257 | 
|  | 258         Returns: | 
|  | 259             None | 
|  | 260         """ | 
|  | 261         terminate(str(self)) | 
|  | 262 | 
|  | 263     def __str__(self) -> str: | 
|  | 264         """ | 
|  | 265         (Private) Returns a string representing the current CustomErr instance. | 
|  | 266 | 
|  | 267         Returns: | 
|  | 268             str: A string representing the current CustomErr instance. | 
|  | 269         """ | 
|  | 270         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | 
|  | 271 | 
|  | 272 class ArgsErr(CustomErr): | 
|  | 273     """ | 
|  | 274     CustomErr subclass for UI arguments errors. | 
|  | 275     """ | 
|  | 276     errName = "Args Error" | 
|  | 277     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 278         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | 
|  | 279 | 
|  | 280 class DataErr(CustomErr): | 
|  | 281     """ | 
|  | 282     CustomErr subclass for data formatting errors. | 
|  | 283     """ | 
|  | 284     errName = "Data Format Error" | 
|  | 285     def __init__(self, fileName :str, msg = "no further details provided") -> None: | 
|  | 286         super().__init__(f"file \"{fileName}\" contains malformed data", msg) | 
|  | 287 | 
|  | 288 class PathErr(CustomErr): | 
|  | 289     """ | 
|  | 290     CustomErr subclass for filepath formatting errors. | 
|  | 291     """ | 
|  | 292     errName = "Path Error" | 
|  | 293     def __init__(self, path :FilePath, msg = "no further details provided") -> None: | 
|  | 294         super().__init__(f"path \"{path}\" is invalid", msg) | 
|  | 295 | 
|  | 296 class ValueErr(CustomErr): | 
|  | 297     """ | 
|  | 298     CustomErr subclass for any value error. | 
|  | 299     """ | 
|  | 300     errName = "Value Error" | 
|  | 301     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 302         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | 
|  | 303 | 
|  | 304 # RESULT | 
|  | 305 T = TypeVar('T') | 
|  | 306 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | 
|  | 307 class Result(Generic[T, E]): | 
|  | 308     class ResultErr(CustomErr): | 
|  | 309         """ | 
|  | 310         CustomErr subclass for all Result errors. | 
|  | 311         """ | 
|  | 312         errName = "Result Error" | 
|  | 313         def __init__(self, msg = "no further details provided") -> None: | 
|  | 314             super().__init__(msg) | 
|  | 315     """ | 
|  | 316     Class to handle the result of an operation, with a value and a boolean flag to indicate | 
|  | 317     whether the operation was successful or not. | 
|  | 318     """ | 
|  | 319     def __init__(self, value :Union[T, E], isOk :bool) -> None: | 
|  | 320         """ | 
|  | 321         (Private) Initializes an instance of Result. | 
|  | 322 | 
|  | 323         Args: | 
|  | 324             value (Union[T, E]): The value to be stored in the Result instance. | 
|  | 325             isOk (bool): A boolean flag to indicate whether the operation was successful or not. | 
|  | 326 | 
|  | 327             Returns: | 
|  | 328                 None : practically, a Result instance. | 
|  | 329         """ | 
|  | 330         self.isOk  = isOk | 
|  | 331         self.isErr = not isOk | 
|  | 332         self.value = value | 
|  | 333 | 
|  | 334     @classmethod | 
|  | 335     def Ok(cls,  value :T) -> "Result": | 
|  | 336         """ | 
|  | 337         Constructs a new Result instance with a successful operation. | 
|  | 338 | 
|  | 339         Args: | 
|  | 340             value (T): The value to be stored in the Result instance, set as successful. | 
|  | 341 | 
|  | 342         Returns: | 
|  | 343             Result: A new Result instance with a successful operation. | 
|  | 344         """ | 
|  | 345         return Result(value, isOk = True) | 
|  | 346 | 
|  | 347     @classmethod | 
|  | 348     def Err(cls, value :E) -> "Result": | 
|  | 349         """ | 
|  | 350         Constructs a new Result instance with a failed operation. | 
|  | 351 | 
|  | 352         Args: | 
|  | 353             value (E): The value to be stored in the Result instance, set as failed. | 
|  | 354 | 
|  | 355         Returns: | 
|  | 356             Result: A new Result instance with a failed operation. | 
|  | 357         """ | 
|  | 358         return Result(value, isOk = False) | 
|  | 359 | 
|  | 360     def unwrap(self) -> T: | 
|  | 361         """ | 
|  | 362         Unwraps the value of the Result instance, if the operation was successful. | 
|  | 363 | 
|  | 364         Raises: | 
|  | 365             ResultErr: If the operation was not successful. | 
|  | 366 | 
|  | 367         Returns: | 
|  | 368             T: The value of the Result instance, if the operation was successful. | 
|  | 369         """ | 
|  | 370         if self.isOk: return self.value | 
|  | 371         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | 
|  | 372 | 
|  | 373     def unwrapOr(self, default :T) -> T: | 
|  | 374         """ | 
|  | 375         Unwraps the value of the Result instance, if the operation was successful, otherwise | 
|  | 376         it returns a default value. | 
|  | 377 | 
|  | 378         Args: | 
|  | 379             default (T): The default value to be returned if the operation was not successful. | 
|  | 380 | 
|  | 381         Returns: | 
|  | 382             T: The value of the Result instance, if the operation was successful, | 
|  | 383             otherwise the default value. | 
|  | 384         """ | 
|  | 385         return self.value if self.isOk else default | 
|  | 386 | 
|  | 387     def expect(self, err :"Result.ResultErr") -> T: | 
|  | 388         """ | 
|  | 389         Expects that the value of the Result instance is successful, otherwise it raises an error. | 
|  | 390 | 
|  | 391         Args: | 
|  | 392             err (Exception): The error to be raised if the operation was not successful. | 
|  | 393 | 
|  | 394         Raises: | 
|  | 395             err: The error raised if the operation was not successful. | 
|  | 396 | 
|  | 397         Returns: | 
|  | 398             T: The value of the Result instance, if the operation was successful. | 
|  | 399         """ | 
|  | 400         if self.isOk: return self.value | 
|  | 401         raise err | 
|  | 402 | 
|  | 403     U = TypeVar("U") | 
|  | 404     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | 
|  | 405         """ | 
|  | 406         Maps the value of the current Result to whatever is returned by the mapper function. | 
|  | 407         If the Result contained an unsuccessful operation to begin with it remains unchanged | 
|  | 408         (a reference to the current instance is returned). | 
|  | 409         If the mapper function panics the returned result instance will be of the error kind. | 
|  | 410 | 
|  | 411         Args: | 
|  | 412             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | 
|  | 413 | 
|  | 414         Returns: | 
|  | 415             Result[U, E]: The result of the mapper operation applied to the Result value. | 
|  | 416         """ | 
|  | 417         if self.isErr: return self | 
|  | 418         try: return Result.Ok(mapper(self.value)) | 
|  | 419         except Exception as e: return Result.Err(e) | 
|  | 420 | 
|  | 421     D = TypeVar("D", bound = "Result.ResultErr") | 
|  | 422     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | 
|  | 423         """ | 
|  | 424         Maps the error of the current Result to whatever is returned by the mapper function. | 
|  | 425         If the Result contained a successful operation it remains unchanged | 
|  | 426         (a reference to the current instance is returned). | 
|  | 427         If the mapper function panics this method does as well. | 
|  | 428 | 
|  | 429         Args: | 
|  | 430             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | 
|  | 431 | 
|  | 432         Returns: | 
|  | 433             Result[U, E]: The result of the mapper operation applied to the Result error. | 
|  | 434         """ | 
|  | 435         if self.isOk: return self | 
|  | 436         return Result.Err(mapper(self.value)) | 
|  | 437 | 
|  | 438     def __str__(self): | 
|  | 439         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | 
|  | 440 | 
|  | 441 # FILES | 
|  | 442 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | 
|  | 443     """ | 
|  | 444     Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | 
|  | 445 | 
|  | 446     Args: | 
|  | 447         path : the path to the dataset file. | 
|  | 448         datasetName : the name of the dataset. | 
|  | 449 | 
|  | 450     Raises: | 
|  | 451         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | 
|  | 452         it has less than 2 columns. | 
|  | 453 | 
|  | 454     Returns: | 
|  | 455         pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | 
|  | 456     """ | 
|  | 457     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | 
|  | 458     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | 
|  | 459     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | 
|  | 460     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | 
|  | 461     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | 
|  | 462     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | 
|  | 463     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | 
|  | 464     except: | 
|  | 465         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | 
|  | 466         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | 
|  | 467 | 
|  | 468     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | 
|  | 469     return dataset | 
|  | 470 | 
|  | 471 def readPickle(path :FilePath) -> Any: | 
|  | 472     """ | 
|  | 473     Reads the contents of a .pickle file, which needs to exist at the given path. | 
|  | 474 | 
|  | 475     Args: | 
|  | 476         path : the path to the .pickle file. | 
|  | 477 | 
|  | 478     Returns: | 
|  | 479         Any : the data inside a pickle file, could be anything. | 
|  | 480     """ | 
|  | 481     with open(path.show(), "rb") as fd: return pickle.load(fd) | 
|  | 482 | 
|  | 483 def writePickle(path :FilePath, data :Any) -> None: | 
|  | 484     """ | 
|  | 485     Saves any data in a .pickle file, created at the given path. | 
|  | 486 | 
|  | 487     Args: | 
|  | 488         path : the path to the .pickle file. | 
|  | 489         data : the data to be written to the file. | 
|  | 490 | 
|  | 491     Returns: | 
|  | 492         None | 
|  | 493     """ | 
|  | 494     with open(path.show(), "wb") as fd: pickle.dump(data, fd) | 
|  | 495 | 
|  | 496 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | 
|  | 497     """ | 
|  | 498     Reads the contents of a .csv file, which needs to exist at the given path. | 
|  | 499 | 
|  | 500     Args: | 
|  | 501         path : the path to the .csv file. | 
|  | 502         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | 
|  | 503         skipHeader : whether the first row of the file is a header and should be skipped. | 
|  | 504 | 
|  | 505     Returns: | 
|  | 506         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | 
|  | 507     """ | 
|  | 508     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | 
|  | 509 | 
|  | 510 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | 
|  | 511     """ | 
|  | 512     Reads the contents of a .svg file, which needs to exist at the given path. | 
|  | 513 | 
|  | 514     Args: | 
|  | 515         path : the path to the .svg file. | 
|  | 516 | 
|  | 517     Raises: | 
|  | 518         DataErr : if the map is malformed. | 
|  | 519 | 
|  | 520     Returns: | 
|  | 521         Any : the data inside a svg file, could be anything. | 
|  | 522     """ | 
|  | 523     try: return ET.parse(path.show()) | 
|  | 524     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | 
|  | 525         raise customErr if customErr else err | 
|  | 526 | 
|  | 527 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | 
|  | 528     """ | 
|  | 529     Saves svg data opened with lxml.etree in a .svg file, created at the given path. | 
|  | 530 | 
|  | 531     Args: | 
|  | 532         path : the path to the .svg file. | 
|  | 533         data : the data to be written to the file. | 
|  | 534 | 
|  | 535     Returns: | 
|  | 536         None | 
|  | 537     """ | 
|  | 538     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | 
|  | 539 | 
|  | 540 # UI ARGUMENTS | 
|  | 541 class Bool: | 
|  | 542     def __init__(self, argName :str) -> None: | 
|  | 543         self.argName = argName | 
|  | 544 | 
|  | 545     def __call__(self, s :str) -> bool: return self.check(s) | 
|  | 546 | 
|  | 547     def check(self, s :str) -> bool: | 
|  | 548         s = s.lower() | 
|  | 549         if s == "true" : return True | 
|  | 550         if s == "false": return False | 
|  | 551         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | 
|  | 552 | 
|  | 553 class Float: | 
|  | 554     def __init__(self, argName = "Dataset values, not an argument") -> None: | 
|  | 555         self.argName = argName | 
|  | 556 | 
|  | 557     def __call__(self, s :str) -> float: return self.check(s) | 
|  | 558 | 
|  | 559     def check(self, s :str) -> float: | 
|  | 560         try: return float(s) | 
|  | 561         except ValueError: | 
|  | 562             s = s.lower() | 
|  | 563             if s == "nan" or s == "none": return math.nan | 
|  | 564             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | 
|  | 565 | 
|  | 566 # MODELS | 
|  | 567 OldRule = List[Union[str, "OldRule"]] | 
|  | 568 class Model(Enum): | 
|  | 569     """ | 
|  | 570     Represents a metabolic model, either custom or locally supported. Custom models don't point | 
|  | 571     to valid file paths. | 
|  | 572     """ | 
|  | 573 | 
|  | 574     Recon   = "Recon" | 
|  | 575     ENGRO2  = "ENGRO2" | 
|  | 576     ENGRO2_no_legend = "ENGRO2_no_legend" | 
|  | 577     HMRcore = "HMRcore" | 
|  | 578     HMRcore_no_legend = "HMRcore_no_legend" | 
|  | 579     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | 
|  | 580 | 
|  | 581     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | 
|  | 582         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | 
|  | 583 | 
|  | 584     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | 
|  | 585         """ | 
|  | 586         Open "rules" file for this model. | 
|  | 587 | 
|  | 588         Returns: | 
|  | 589             Dict[str, Dict[str, OldRule]] : the rules for this model. | 
|  | 590         """ | 
|  | 591         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 592         self.__raiseMissingPathErr(path) | 
|  | 593         return readPickle(path) | 
|  | 594 | 
|  | 595     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | 
|  | 596         """ | 
|  | 597         Open "gene translator (old: gene_in_rule)" file for this model. | 
|  | 598 | 
|  | 599         Returns: | 
|  | 600             Dict[str, Dict[str, str]] : the translator dict for this model. | 
|  | 601         """ | 
|  | 602         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 603         self.__raiseMissingPathErr(path) | 
|  | 604         return readPickle(path) | 
|  | 605 | 
|  | 606     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | 
|  | 607         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | 
|  | 608         self.__raiseMissingPathErr(path) | 
|  | 609         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | 
|  | 610 | 
|  | 611     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | 
|  | 612         if(self is Model.Custom): | 
|  | 613             return self.load_custom_model(customPath, customExtension) | 
|  | 614         else: | 
|  | 615             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | 
|  | 616 | 
|  | 617     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | 
|  | 618         ext = ext if ext else file_path.ext | 
|  | 619         try: | 
|  | 620             if str(ext) in FileFormat.XML.value: | 
|  | 621                 return cobra.io.read_sbml_model(file_path.show()) | 
|  | 622 | 
|  | 623             if str(ext) in FileFormat.JSON.value: | 
|  | 624                 # Compressed files are not automatically handled by cobra | 
|  | 625                 if(ext == "json"): | 
|  | 626                     return cobra.io.load_json_model(file_path.show()) | 
|  | 627                 else: | 
|  | 628                     return self.extract_model(file_path, ext, "json") | 
|  | 629 | 
|  | 630             if str(ext) in FileFormat.MAT.value: | 
|  | 631                 # Compressed files are not automatically handled by cobra | 
|  | 632                 if(ext == "mat"): | 
|  | 633                     return cobra.io.load_matlab_model(file_path.show()) | 
|  | 634                 else: | 
|  | 635                     return self.extract_model(file_path, ext, "mat") | 
|  | 636 | 
|  | 637             if str(ext) in FileFormat.YML.value: | 
|  | 638                 # Compressed files are not automatically handled by cobra | 
|  | 639                 if(ext == "yml"): | 
|  | 640                     return cobra.io.load_yaml_model(file_path.show()) | 
|  | 641                 else: | 
|  | 642                     return self.extract_model(file_path, ext, "yml") | 
|  | 643 | 
|  | 644         except Exception as e: raise DataErr(file_path, e.__str__()) | 
|  | 645         raise DataErr(file_path, | 
|  | 646             f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.") | 
|  | 647 | 
|  | 648 | 
|  | 649     def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model: | 
|  | 650         """ | 
|  | 651         Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2). | 
|  | 652 | 
|  | 653         Args: | 
|  | 654             file_path: File path of the model | 
|  | 655             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) | 
|  | 656 | 
|  | 657         Returns: | 
|  | 658             cobra.Model: COBRApy model | 
|  | 659 | 
|  | 660         Raises: | 
|  | 661             Exception: Extraction errors | 
|  | 662         """ | 
|  | 663         ext_str = str(ext) | 
|  | 664 | 
|  | 665         try: | 
|  | 666             if '.zip' in ext_str: | 
|  | 667                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: | 
|  | 668                     with zip_ref.open(zip_ref.namelist()[0]) as json_file: | 
|  | 669                         content = json_file.read().decode('utf-8') | 
|  | 670                         if model_encoding == "json": | 
|  | 671                             return cobra.io.load_json_model(StringIO(content)) | 
|  | 672                         elif model_encoding == "mat": | 
|  | 673                             return cobra.io.load_matlab_model(StringIO(content)) | 
|  | 674                         elif model_encoding == "yml": | 
|  | 675                             return cobra.io.load_yaml_model(StringIO(content)) | 
|  | 676                         else: | 
|  | 677                             raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | 
|  | 678             elif '.gz' in ext_str: | 
|  | 679                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: | 
|  | 680                     if model_encoding == "json": | 
|  | 681                         return cobra.io.load_json_model(gz_ref) | 
|  | 682                     elif model_encoding == "mat": | 
|  | 683                         return cobra.io.load_matlab_model(gz_ref) | 
|  | 684                     elif model_encoding == "yml": | 
|  | 685                         return cobra.io.load_yaml_model(gz_ref) | 
|  | 686                     else: | 
|  | 687                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | 
|  | 688             elif '.bz2' in ext_str: | 
|  | 689                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: | 
|  | 690                     if model_encoding == "json": | 
|  | 691                         return cobra.io.load_json_model(bz2_ref) | 
|  | 692                     elif model_encoding == "mat": | 
|  | 693                         return cobra.io.load_matlab_model(bz2_ref) | 
|  | 694                     elif model_encoding == "yml": | 
|  | 695                         return cobra.io.load_yaml_model(bz2_ref) | 
|  | 696                     else: | 
|  | 697                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | 
|  | 698             else: | 
|  | 699                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") | 
|  | 700 | 
|  | 701         except Exception as e: | 
|  | 702             raise Exception(f"Error during model extraction: {str(e)}") | 
|  | 703 | 
|  | 704 | 
|  | 705 | 
| 394 | 706     def __str__(self) -> str: return self.value | 
|  | 707 | 
|  | 708 | 
|  | 709 def convert_genes(model,annotation): | 
|  | 710     from cobra.manipulation import rename_genes | 
|  | 711     model2=model.copy() | 
|  | 712     try: | 
|  | 713         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes} | 
|  | 714     except: | 
|  | 715         print("No annotation in gene dict!") | 
|  | 716         return -1 | 
|  | 717     rename_genes(model2,dict_genes) | 
|  | 718 | 
| 408 | 719     return model2 | 
|  | 720 | 
|  | 721 | 
| 409 | 722 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobra.Model: | 
| 408 | 723     """ | 
|  | 724     Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. | 
|  | 725 | 
|  | 726     Args: | 
|  | 727         csv_path: Path al file CSV (separato da tab) | 
|  | 728         model_id: ID del modello da creare | 
|  | 729 | 
|  | 730     Returns: | 
|  | 731         cobra.Model: Il modello COBRApy costruito | 
|  | 732     """ | 
|  | 733 | 
|  | 734     # Leggi i dati dal CSV | 
|  | 735     df = pd.read_csv(csv_path, sep='\t') | 
|  | 736 | 
|  | 737     # Crea il modello vuoto | 
| 409 | 738     model = cobraModel(model_id) | 
| 408 | 739 | 
|  | 740     # Dict per tenere traccia di metaboliti e compartimenti | 
|  | 741     metabolites_dict = {} | 
|  | 742     compartments_dict = {} | 
|  | 743 | 
|  | 744     print(f"Costruendo modello da {len(df)} reazioni...") | 
|  | 745 | 
|  | 746     # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni | 
|  | 747     for idx, row in df.iterrows(): | 
|  | 748         reaction_formula = str(row['Reaction']).strip() | 
|  | 749         if not reaction_formula or reaction_formula == 'nan': | 
|  | 750             continue | 
|  | 751 | 
|  | 752         # Estrai metaboliti dalla formula della reazione | 
|  | 753         metabolites = extract_metabolites_from_reaction(reaction_formula) | 
|  | 754 | 
|  | 755         for met_id in metabolites: | 
|  | 756             compartment = extract_compartment_from_metabolite(met_id) | 
|  | 757 | 
|  | 758             # Aggiungi compartimento se non esiste | 
|  | 759             if compartment not in compartments_dict: | 
|  | 760                 compartments_dict[compartment] = compartment | 
|  | 761 | 
|  | 762             # Aggiungi metabolita se non esiste | 
|  | 763             if met_id not in metabolites_dict: | 
|  | 764                 metabolites_dict[met_id] = Metabolite( | 
|  | 765                     id=met_id, | 
|  | 766                     compartment=compartment, | 
|  | 767                     name=met_id.replace(f"_{compartment}", "").replace("__", "_") | 
|  | 768                 ) | 
|  | 769 | 
|  | 770     # Aggiungi compartimenti al modello | 
|  | 771     model.compartments = compartments_dict | 
|  | 772 | 
|  | 773     # Aggiungi metaboliti al modello | 
|  | 774     model.add_metabolites(list(metabolites_dict.values())) | 
|  | 775 | 
|  | 776     print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") | 
|  | 777 | 
|  | 778     # Seconda passata: aggiungi le reazioni | 
|  | 779     reactions_added = 0 | 
|  | 780     reactions_skipped = 0 | 
|  | 781 | 
|  | 782     for idx, row in df.iterrows(): | 
|  | 783         try: | 
|  | 784             reaction_id = str(row['ReactionID']).strip() | 
|  | 785             reaction_formula = str(row['Reaction']).strip() | 
|  | 786 | 
|  | 787             # Salta reazioni senza formula | 
|  | 788             if not reaction_formula or reaction_formula == 'nan': | 
|  | 789                 reactions_skipped += 1 | 
|  | 790                 continue | 
|  | 791 | 
|  | 792             # Crea la reazione | 
|  | 793             reaction = Reaction(reaction_id) | 
|  | 794             reaction.name = reaction_id | 
|  | 795 | 
|  | 796             # Imposta bounds | 
|  | 797             reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 | 
|  | 798             reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 | 
|  | 799 | 
|  | 800             # Aggiungi gene rule se presente | 
|  | 801             if pd.notna(row['Rule']) and str(row['Rule']).strip(): | 
|  | 802                 reaction.gene_reaction_rule = str(row['Rule']).strip() | 
|  | 803 | 
|  | 804             # Parse della formula della reazione | 
|  | 805             try: | 
|  | 806                 parse_reaction_formula(reaction, reaction_formula, metabolites_dict) | 
|  | 807             except Exception as e: | 
|  | 808                 print(f"Errore nel parsing della reazione {reaction_id}: {e}") | 
|  | 809                 reactions_skipped += 1 | 
|  | 810                 continue | 
|  | 811 | 
|  | 812             # Aggiungi la reazione al modello | 
|  | 813             model.add_reactions([reaction]) | 
|  | 814             reactions_added += 1 | 
|  | 815 | 
|  | 816         except Exception as e: | 
|  | 817             print(f"Errore nell'aggiungere la reazione {reaction_id}: {e}") | 
|  | 818             reactions_skipped += 1 | 
|  | 819             continue | 
|  | 820 | 
|  | 821     print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") | 
|  | 822 | 
|  | 823     # Imposta l'obiettivo di biomassa | 
|  | 824     set_biomass_objective(model) | 
|  | 825 | 
|  | 826     # Imposta il medium | 
|  | 827     set_medium_from_data(model, df) | 
|  | 828 | 
|  | 829     print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") | 
|  | 830 | 
|  | 831     return model | 
|  | 832 | 
|  | 833 | 
|  | 834 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) | 
|  | 835 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: | 
|  | 836     """ | 
|  | 837     Estrae gli ID dei metaboliti da una formula di reazione. | 
|  | 838     Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) | 
|  | 839     e permette che comincino con cifre o underscore. | 
|  | 840     """ | 
|  | 841     metabolites = set() | 
|  | 842     # coefficiente opzionale seguito da un token che termina con _<letters> | 
|  | 843     pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' | 
|  | 844     matches = re.findall(pattern, reaction_formula) | 
|  | 845     metabolites.update(matches) | 
|  | 846     return metabolites | 
|  | 847 | 
|  | 848 | 
|  | 849 def extract_compartment_from_metabolite(metabolite_id: str) -> str: | 
|  | 850     """ | 
|  | 851     Estrae il compartimento dall'ID del metabolita. | 
|  | 852     """ | 
|  | 853     # Il compartimento è solitamente l'ultima lettera dopo l'underscore | 
|  | 854     if '_' in metabolite_id: | 
|  | 855         return metabolite_id.split('_')[-1] | 
|  | 856     return 'c'  # default cytoplasm | 
|  | 857 | 
|  | 858 | 
|  | 859 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): | 
|  | 860     """ | 
|  | 861     Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. | 
|  | 862     """ | 
|  | 863 | 
|  | 864     if reaction.id == 'EX_thbpt_e': | 
|  | 865         print(reaction.id) | 
|  | 866         print(formula) | 
|  | 867     # Dividi in parte sinistra e destra | 
|  | 868     if '<=>' in formula: | 
|  | 869         left, right = formula.split('<=>') | 
|  | 870         reversible = True | 
|  | 871     elif '<--' in formula: | 
|  | 872         left, right = formula.split('<--') | 
|  | 873         reversible = False | 
|  | 874         left, right = left, right | 
|  | 875     elif '-->' in formula: | 
|  | 876         left, right = formula.split('-->') | 
|  | 877         reversible = False | 
|  | 878     elif '<-' in formula: | 
|  | 879         left, right = formula.split('<-') | 
|  | 880         reversible = False | 
|  | 881         left, right = left, right | 
|  | 882     else: | 
|  | 883         raise ValueError(f"Formato reazione non riconosciuto: {formula}") | 
|  | 884 | 
|  | 885     # Parse dei metaboliti e coefficienti | 
|  | 886     reactants = parse_metabolites_side(left.strip()) | 
|  | 887     products = parse_metabolites_side(right.strip()) | 
|  | 888 | 
|  | 889     # Aggiungi metaboliti alla reazione | 
|  | 890     metabolites_to_add = {} | 
|  | 891 | 
|  | 892     # Reagenti (coefficienti negativi) | 
|  | 893     for met_id, coeff in reactants.items(): | 
|  | 894         if met_id in metabolites_dict: | 
|  | 895             metabolites_to_add[metabolites_dict[met_id]] = -coeff | 
|  | 896 | 
|  | 897     # Prodotti (coefficienti positivi) | 
|  | 898     for met_id, coeff in products.items(): | 
|  | 899         if met_id in metabolites_dict: | 
|  | 900             metabolites_to_add[metabolites_dict[met_id]] = coeff | 
|  | 901 | 
|  | 902     reaction.add_metabolites(metabolites_to_add) | 
|  | 903 | 
|  | 904 | 
|  | 905 def parse_metabolites_side(side_str: str) -> Dict[str, float]: | 
|  | 906     """ | 
|  | 907     Parsa un lato della reazione per estrarre metaboliti e coefficienti. | 
|  | 908     """ | 
|  | 909     metabolites = {} | 
|  | 910     if not side_str or side_str.strip() == '': | 
|  | 911         return metabolites | 
|  | 912 | 
|  | 913     terms = side_str.split('+') | 
|  | 914     for term in terms: | 
|  | 915         term = term.strip() | 
|  | 916         if not term: | 
|  | 917             continue | 
|  | 918 | 
|  | 919         # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> | 
|  | 920         match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) | 
|  | 921         if match: | 
|  | 922             coeff_str, met_id = match.groups() | 
|  | 923             coeff = float(coeff_str) if coeff_str else 1.0 | 
|  | 924             metabolites[met_id] = coeff | 
|  | 925 | 
|  | 926     return metabolites | 
|  | 927 | 
|  | 928 | 
|  | 929 | 
|  | 930 def set_biomass_objective(model: Model): | 
|  | 931     """ | 
|  | 932     Imposta la reazione di biomassa come obiettivo. | 
|  | 933     """ | 
|  | 934     biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()] | 
|  | 935 | 
|  | 936     if biomass_reactions: | 
|  | 937         model.objective = biomass_reactions[0].id | 
|  | 938         print(f"Obiettivo impostato su: {biomass_reactions[0].id}") | 
|  | 939     else: | 
|  | 940         print("Nessuna reazione di biomassa trovata") | 
|  | 941 | 
|  | 942 | 
|  | 943 def set_medium_from_data(model: Model, df: pd.DataFrame): | 
|  | 944     """ | 
|  | 945     Imposta il medium basato sulla colonna InMedium. | 
|  | 946     """ | 
|  | 947     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() | 
|  | 948 | 
|  | 949     medium_dict = {} | 
|  | 950     for rxn_id in medium_reactions: | 
|  | 951         if rxn_id in [r.id for r in model.reactions]: | 
|  | 952             reaction = model.reactions.get_by_id(rxn_id) | 
|  | 953             if reaction.lower_bound < 0:  # Solo reazioni di uptake | 
|  | 954                 medium_dict[rxn_id] = abs(reaction.lower_bound) | 
|  | 955 | 
|  | 956     if medium_dict: | 
|  | 957         model.medium = medium_dict | 
|  | 958         print(f"Medium impostato con {len(medium_dict)} componenti") | 
|  | 959 | 
|  | 960 | 
|  | 961 def validate_model(model: Model) -> Dict[str, any]: | 
|  | 962     """ | 
|  | 963     Valida il modello e fornisce statistiche di base. | 
|  | 964     """ | 
|  | 965     validation = { | 
|  | 966         'num_reactions': len(model.reactions), | 
|  | 967         'num_metabolites': len(model.metabolites), | 
|  | 968         'num_genes': len(model.genes), | 
|  | 969         'num_compartments': len(model.compartments), | 
|  | 970         'objective': str(model.objective), | 
|  | 971         'medium_size': len(model.medium), | 
|  | 972         'reversible_reactions': len([r for r in model.reactions if r.reversibility]), | 
|  | 973         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), | 
|  | 974     } | 
|  | 975 | 
|  | 976     try: | 
|  | 977         # Test di crescita | 
|  | 978         solution = model.optimize() | 
|  | 979         validation['growth_rate'] = solution.objective_value | 
|  | 980         validation['status'] = solution.status | 
|  | 981     except Exception as e: | 
|  | 982         validation['growth_rate'] = None | 
|  | 983         validation['status'] = f"Error: {e}" | 
|  | 984 | 
|  | 985     return validation | 
| 411 | 986 | 
|  | 987 | 
|  | 988 ################################- DATA GENERATION -################################ | 
|  | 989 ReactionId = str | 
|  | 990 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: | 
|  | 991     """ | 
|  | 992     Generates a dictionary mapping reaction ids to rules from the model. | 
|  | 993 | 
|  | 994     Args: | 
|  | 995         model : the model to derive data from. | 
|  | 996         asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. | 
|  | 997 | 
|  | 998     Returns: | 
|  | 999         Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. | 
|  | 1000         Dict[ReactionId, str] : the generated dictionary of raw rules. | 
|  | 1001     """ | 
|  | 1002     # Is the below approach convoluted? yes | 
|  | 1003     # Ok but is it inefficient? probably | 
|  | 1004     # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) | 
|  | 1005     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule | 
|  | 1006     ruleExtractor = (lambda reaction : | 
|  | 1007         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter | 
|  | 1008 | 
|  | 1009     return { | 
|  | 1010         reaction.id : ruleExtractor(reaction) | 
|  | 1011         for reaction in model.reactions | 
|  | 1012         if reaction.gene_reaction_rule } | 
|  | 1013 | 
|  | 1014 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]: | 
|  | 1015     """ | 
|  | 1016     Generates a dictionary mapping reaction ids to reaction formulas from the model. | 
|  | 1017 | 
|  | 1018     Args: | 
|  | 1019         model : the model to derive data from. | 
|  | 1020         asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. | 
|  | 1021 | 
|  | 1022     Returns: | 
|  | 1023         Dict[ReactionId, str] : the generated dictionary. | 
|  | 1024     """ | 
|  | 1025 | 
|  | 1026     unparsedReactions = { | 
|  | 1027         reaction.id : reaction.reaction | 
|  | 1028         for reaction in model.reactions | 
|  | 1029         if reaction.reaction | 
|  | 1030     } | 
|  | 1031 | 
|  | 1032     if not asParsed: return unparsedReactions | 
|  | 1033 | 
|  | 1034     return reactionUtils.create_reaction_dict(unparsedReactions) | 
|  | 1035 | 
|  | 1036 def get_medium(model:cobra.Model) -> pd.DataFrame: | 
|  | 1037     trueMedium=[] | 
|  | 1038     for r in model.reactions: | 
|  | 1039         positiveCoeff=0 | 
|  | 1040         for m in r.metabolites: | 
|  | 1041             if r.get_coefficient(m.id)>0: | 
|  | 1042                 positiveCoeff=1; | 
|  | 1043         if (positiveCoeff==0 and r.lower_bound<0): | 
|  | 1044             trueMedium.append(r.id) | 
|  | 1045 | 
|  | 1046     df_medium = pd.DataFrame() | 
|  | 1047     df_medium["reaction"] = trueMedium | 
|  | 1048     return df_medium | 
|  | 1049 | 
|  | 1050 def generate_bounds(model:cobra.Model) -> pd.DataFrame: | 
|  | 1051 | 
|  | 1052     rxns = [] | 
|  | 1053     for reaction in model.reactions: | 
|  | 1054         rxns.append(reaction.id) | 
|  | 1055 | 
|  | 1056     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) | 
|  | 1057 | 
|  | 1058     for reaction in model.reactions: | 
|  | 1059         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] | 
|  | 1060     return bounds | 
|  | 1061 | 
|  | 1062 | 
|  | 1063 | 
|  | 1064 def generate_compartments(model: cobra.Model) -> pd.DataFrame: | 
|  | 1065     """ | 
|  | 1066     Generates a DataFrame containing compartment information for each reaction. | 
|  | 1067     Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) | 
|  | 1068 | 
|  | 1069     Args: | 
|  | 1070         model: the COBRA model to extract compartment data from. | 
|  | 1071 | 
|  | 1072     Returns: | 
|  | 1073         pd.DataFrame: DataFrame with ReactionID and compartment columns | 
|  | 1074     """ | 
|  | 1075     pathway_data = [] | 
|  | 1076 | 
|  | 1077     # First pass: determine the maximum number of pathways any reaction has | 
|  | 1078     max_pathways = 0 | 
|  | 1079     reaction_pathways = {} | 
|  | 1080 | 
|  | 1081     for reaction in model.reactions: | 
|  | 1082         # Get unique pathways from all metabolites in the reaction | 
|  | 1083         if type(reaction.annotation['pathways']) == list: | 
|  | 1084             reaction_pathways[reaction.id] = reaction.annotation['pathways'] | 
|  | 1085             max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) | 
|  | 1086         else: | 
|  | 1087             reaction_pathways[reaction.id] = [reaction.annotation['pathways']] | 
|  | 1088 | 
|  | 1089     # Create column names for pathways | 
|  | 1090     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] | 
|  | 1091 | 
|  | 1092     # Second pass: create the data | 
|  | 1093     for reaction_id, pathways in reaction_pathways.items(): | 
|  | 1094         row = {"ReactionID": reaction_id} | 
|  | 1095 | 
|  | 1096         # Fill pathway columns | 
|  | 1097         for i in range(max_pathways): | 
|  | 1098             col_name = pathway_columns[i] | 
|  | 1099             if i < len(pathways): | 
|  | 1100                 row[col_name] = pathways[i] | 
|  | 1101             else: | 
|  | 1102                 row[col_name] = None  # or "" if you prefer empty strings | 
|  | 1103 | 
|  | 1104         pathway_data.append(row) | 
|  | 1105 | 
|  | 1106     return pd.DataFrame(pathway_data) |