| 
392
 | 
     1 import math
 | 
| 
 | 
     2 import re
 | 
| 
 | 
     3 import sys
 | 
| 
 | 
     4 import csv
 | 
| 
 | 
     5 import pickle
 | 
| 
 | 
     6 import lxml.etree as ET
 | 
| 
 | 
     7 
 | 
| 
 | 
     8 from enum import Enum
 | 
| 
 | 
     9 from itertools import count
 | 
| 
408
 | 
    10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple
 | 
| 
392
 | 
    11 
 | 
| 
 | 
    12 import pandas as pd
 | 
| 
 | 
    13 import cobra
 | 
| 
409
 | 
    14 from cobra import Model as cobraModel, Reaction, Metabolite
 | 
| 
392
 | 
    15 
 | 
| 
 | 
    16 import zipfile
 | 
| 
 | 
    17 import gzip
 | 
| 
 | 
    18 import bz2
 | 
| 
 | 
    19 from io import StringIO
 | 
| 
413
 | 
    20 import os
 | 
| 
 | 
    21 sys.path.insert(0, os.path.dirname(__file__)) 
 | 
| 
 | 
    22 import rule_parsing  as rulesUtils
 | 
| 
 | 
    23 import reaction_parsing as reactionUtils
 | 
| 
 | 
    24 
 | 
| 
392
 | 
    25 
 | 
| 
394
 | 
    26 
 | 
| 
 | 
    27 
 | 
| 
392
 | 
    28 class ValueErr(Exception):
 | 
| 
 | 
    29     def __init__(self, param_name, expected, actual):
 | 
| 
 | 
    30         super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
 | 
| 
 | 
    31 
 | 
| 
 | 
    32 class PathErr(Exception):
 | 
| 
 | 
    33     def __init__(self, path, message):
 | 
| 
 | 
    34         super().__init__(f"Path error for '{path}': {message}")
 | 
| 
 | 
    35 
 | 
| 
 | 
    36 class FileFormat(Enum):
 | 
| 
 | 
    37     """
 | 
| 
 | 
    38     Encodes possible file extensions to conditionally save data in a different format.
 | 
| 
 | 
    39     """
 | 
| 
 | 
    40     DAT    = ("dat",) # this is how galaxy treats all your files!
 | 
| 
 | 
    41     CSV    = ("csv",) # this is how most editable input data is written
 | 
| 
 | 
    42     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
 | 
| 
 | 
    43     SVG    = ("svg",) # this is how most metabolic maps are written
 | 
| 
 | 
    44     PNG    = ("png",) # this is a common output format for images (such as metabolic maps)
 | 
| 
 | 
    45     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications.
 | 
| 
 | 
    46     
 | 
| 
 | 
    47     # Updated to include compressed variants
 | 
| 
 | 
    48     XML    = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
 | 
| 
 | 
    49     JSON   = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
 | 
| 
 | 
    50     MAT    = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
 | 
| 
 | 
    51     YML    = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
 | 
| 
 | 
    52 
 | 
| 
 | 
    53     TXT    = ("txt",) # this is how most output data is written
 | 
| 
 | 
    54     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
 | 
| 
 | 
    55 
 | 
| 
 | 
    56     def __init__(self, *extensions):
 | 
| 
 | 
    57         self.extensions = extensions
 | 
| 
 | 
    58         # Store original extension when set via fromExt
 | 
| 
 | 
    59         self._original_extension = None
 | 
| 
 | 
    60 
 | 
| 
 | 
    61     @classmethod
 | 
| 
 | 
    62     def fromExt(cls, ext: str) -> "FileFormat":
 | 
| 
 | 
    63         """
 | 
| 
 | 
    64         Converts a file extension string to a FileFormat instance.
 | 
| 
 | 
    65         Args:
 | 
| 
 | 
    66             ext : The file extension as a string.
 | 
| 
 | 
    67         Returns:
 | 
| 
 | 
    68             FileFormat: The FileFormat instance corresponding to the file extension.
 | 
| 
 | 
    69         """
 | 
| 
 | 
    70         variantName = ext.upper()
 | 
| 
 | 
    71         if variantName in FileFormat.__members__: 
 | 
| 
 | 
    72             instance = FileFormat[variantName]
 | 
| 
 | 
    73             instance._original_extension = ext
 | 
| 
 | 
    74             return instance
 | 
| 
 | 
    75         
 | 
| 
 | 
    76         variantName = ext.lower()
 | 
| 
 | 
    77         for member in cls:
 | 
| 
 | 
    78             if variantName in member.value: 
 | 
| 
 | 
    79                 # Create a copy-like behavior by storing the original extension
 | 
| 
 | 
    80                 member._original_extension = ext
 | 
| 
 | 
    81                 return member
 | 
| 
 | 
    82         
 | 
| 
 | 
    83         raise ValueErr("ext", "a valid FileFormat file extension", ext)
 | 
| 
 | 
    84 
 | 
| 
 | 
    85     def __str__(self) -> str:
 | 
| 
 | 
    86         """
 | 
| 
 | 
    87         (Private) converts to str representation. Good practice for usage with argparse.
 | 
| 
 | 
    88         Returns:
 | 
| 
 | 
    89             str : the string representation of the file extension.
 | 
| 
 | 
    90         """
 | 
| 
 | 
    91         # If we have an original extension stored (for compressed files only), use it
 | 
| 
 | 
    92         if hasattr(self, '_original_extension') and self._original_extension:
 | 
| 
 | 
    93             return self._original_extension
 | 
| 
 | 
    94         
 | 
| 
 | 
    95         # For XML, JSON, MAT and YML without original extension, use the base extension
 | 
| 
 | 
    96         if self == FileFormat.XML:
 | 
| 
 | 
    97             return "xml"
 | 
| 
 | 
    98         elif self == FileFormat.JSON:
 | 
| 
 | 
    99             return "json"
 | 
| 
 | 
   100         elif self == FileFormat.MAT:
 | 
| 
 | 
   101             return "mat"
 | 
| 
 | 
   102         elif self == FileFormat.YML:
 | 
| 
 | 
   103             return "yml"
 | 
| 
 | 
   104         
 | 
| 
 | 
   105         return self.value[-1]
 | 
| 
 | 
   106 
 | 
| 
 | 
   107 class FilePath():
 | 
| 
 | 
   108     """
 | 
| 
 | 
   109     Represents a file path. View this as an attempt to standardize file-related operations by expecting
 | 
| 
 | 
   110     values of this type in any process requesting a file path.
 | 
| 
 | 
   111     """
 | 
| 
 | 
   112     def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
 | 
| 
 | 
   113         """
 | 
| 
 | 
   114         (Private) Initializes an instance of FilePath.
 | 
| 
 | 
   115         Args:
 | 
| 
 | 
   116             path : the end of the path, containing the file name.
 | 
| 
 | 
   117             ext : the file's extension.
 | 
| 
 | 
   118             prefix : anything before path, if the last '/' isn't there it's added by the code.
 | 
| 
 | 
   119         Returns:
 | 
| 
 | 
   120             None : practically, a FilePath instance.
 | 
| 
 | 
   121         """
 | 
| 
 | 
   122         self.ext = ext
 | 
| 
 | 
   123         self.filePath = filePath
 | 
| 
 | 
   124 
 | 
| 
 | 
   125         if prefix and prefix[-1] != '/': 
 | 
| 
 | 
   126             prefix += '/'
 | 
| 
 | 
   127         self.prefix = prefix
 | 
| 
 | 
   128     
 | 
| 
 | 
   129     @classmethod
 | 
| 
 | 
   130     def fromStrPath(cls, path: str) -> "FilePath":
 | 
| 
 | 
   131         """
 | 
| 
 | 
   132         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
 | 
| 
 | 
   133         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
 | 
| 
 | 
   134         These double extensions are not supported for other file types such as .csv.
 | 
| 
 | 
   135         Args:
 | 
| 
 | 
   136             path : the string containing the path
 | 
| 
 | 
   137         Raises:
 | 
| 
 | 
   138             PathErr : if the provided string doesn't represent a valid path.
 | 
| 
 | 
   139         Returns:
 | 
| 
 | 
   140             FilePath : the constructed instance.
 | 
| 
 | 
   141         """
 | 
| 
 | 
   142         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
 | 
| 
 | 
   143         if not result or not result["name"] or not result["ext"]:
 | 
| 
 | 
   144             raise PathErr(path, "cannot recognize folder structure or extension in path")
 | 
| 
 | 
   145 
 | 
| 
 | 
   146         prefix = result["prefix"] if result["prefix"] else ""
 | 
| 
 | 
   147         name, ext = result["name"], result["ext"]
 | 
| 
 | 
   148 
 | 
| 
 | 
   149         # Check for double extensions (json.gz, xml.zip, etc.)
 | 
| 
 | 
   150         parts = path.split(".")
 | 
| 
 | 
   151         if len(parts) >= 3:  
 | 
| 
 | 
   152             penultimate = parts[-2]
 | 
| 
 | 
   153             last = parts[-1]
 | 
| 
 | 
   154             double_ext = f"{penultimate}.{last}"
 | 
| 
 | 
   155             
 | 
| 
 | 
   156             # Try the double extension first
 | 
| 
 | 
   157             try:
 | 
| 
 | 
   158                 ext_format = FileFormat.fromExt(double_ext)
 | 
| 
 | 
   159                 name = ".".join(parts[:-2])
 | 
| 
 | 
   160                 # Extract prefix if it exists
 | 
| 
 | 
   161                 if '/' in name:
 | 
| 
 | 
   162                     prefix = name[:name.rfind('/') + 1]
 | 
| 
 | 
   163                     name = name[name.rfind('/') + 1:]
 | 
| 
 | 
   164                 return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   165             except ValueErr:
 | 
| 
 | 
   166                 # If double extension doesn't work, fall back to single extension
 | 
| 
 | 
   167                 pass
 | 
| 
 | 
   168 
 | 
| 
 | 
   169         # Single extension fallback (original logic)
 | 
| 
 | 
   170         try:
 | 
| 
 | 
   171             ext_format = FileFormat.fromExt(ext)
 | 
| 
 | 
   172             return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   173         except ValueErr:
 | 
| 
 | 
   174             raise PathErr(path, f"unsupported file extension: {ext}")
 | 
| 
 | 
   175 
 | 
| 
 | 
   176     def show(self) -> str:
 | 
| 
 | 
   177         """
 | 
| 
 | 
   178         Shows the path as a string.
 | 
| 
 | 
   179         Returns:
 | 
| 
 | 
   180             str : the path shown as a string.
 | 
| 
 | 
   181         """
 | 
| 
 | 
   182         return f"{self.prefix}{self.filePath}.{self.ext}"
 | 
| 
 | 
   183     
 | 
| 
 | 
   184     def __str__(self) -> str: 
 | 
| 
 | 
   185         return self.show()
 | 
| 
 | 
   186 
 | 
| 
 | 
   187 # ERRORS
 | 
| 
 | 
   188 def terminate(msg :str) -> None:
 | 
| 
 | 
   189     """
 | 
| 
 | 
   190     Terminate the execution of the script with an error message.
 | 
| 
 | 
   191     
 | 
| 
 | 
   192     Args:
 | 
| 
 | 
   193         msg (str): The error message to be displayed.
 | 
| 
 | 
   194     
 | 
| 
 | 
   195     Returns:
 | 
| 
 | 
   196         None
 | 
| 
 | 
   197     """
 | 
| 
 | 
   198     sys.exit(f"Execution aborted: {msg}\n")
 | 
| 
 | 
   199 
 | 
| 
 | 
   200 def logWarning(msg :str, loggerPath :str) -> None:
 | 
| 
 | 
   201     """
 | 
| 
 | 
   202     Log a warning message to an output log file and print it to the console. The final period and a
 | 
| 
 | 
   203     newline is added by the function.
 | 
| 
 | 
   204 
 | 
| 
 | 
   205     Args:
 | 
| 
 | 
   206         s (str): The warning message to be logged and printed.
 | 
| 
 | 
   207         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
 | 
| 
 | 
   208         immediately read back (beware relative expensive operation, log with caution).
 | 
| 
 | 
   209 
 | 
| 
 | 
   210     Returns:
 | 
| 
 | 
   211         None
 | 
| 
 | 
   212     """
 | 
| 
 | 
   213     # building the path and then reading it immediately seems useless, but it's actually a way of
 | 
| 
 | 
   214     # validating that reduces repetition on the caller's side. Besides, logging a message by writing
 | 
| 
 | 
   215     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
 | 
| 
 | 
   216     # mindlessly logging whenever something comes up, log at the very end and tell the user everything
 | 
| 
 | 
   217     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
 | 
| 
 | 
   218     # the file only at the end of the program's execution.
 | 
| 
 | 
   219     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
 | 
| 
 | 
   220 
 | 
| 
 | 
   221 class CustomErr(Exception):
 | 
| 
 | 
   222     """
 | 
| 
 | 
   223     Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
 | 
| 
 | 
   224     """
 | 
| 
 | 
   225     __idGenerator = count()
 | 
| 
 | 
   226     errName = "Custom Error"
 | 
| 
 | 
   227     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
 | 
| 
 | 
   228         """
 | 
| 
 | 
   229         (Private) Initializes an instance of CustomErr.
 | 
| 
 | 
   230 
 | 
| 
 | 
   231         Args:
 | 
| 
 | 
   232             msg (str): Error message to be displayed.
 | 
| 
 | 
   233             details (str): Informs the user more about the error encountered. Defaults to "".
 | 
| 
 | 
   234             explicitErrCode (int): Explicit error code to be used. Defaults to -1.
 | 
| 
 | 
   235         
 | 
| 
 | 
   236         Returns:
 | 
| 
 | 
   237             None : practically, a CustomErr instance.
 | 
| 
 | 
   238         """
 | 
| 
 | 
   239         self.msg     = msg
 | 
| 
 | 
   240         self.details = details
 | 
| 
 | 
   241 
 | 
| 
 | 
   242         self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
 | 
| 
 | 
   243 
 | 
| 
 | 
   244     def throw(self, loggerPath = "") -> None:
 | 
| 
 | 
   245         """
 | 
| 
 | 
   246         Raises the current CustomErr instance, logging a warning message before doing so.
 | 
| 
 | 
   247 
 | 
| 
 | 
   248         Raises:
 | 
| 
 | 
   249             self: The current CustomErr instance.
 | 
| 
 | 
   250         
 | 
| 
 | 
   251         Returns:
 | 
| 
 | 
   252             None
 | 
| 
 | 
   253         """
 | 
| 
 | 
   254         if loggerPath: logWarning(str(self), loggerPath)
 | 
| 
 | 
   255         raise self
 | 
| 
 | 
   256 
 | 
| 
 | 
   257     def abort(self) -> None:
 | 
| 
 | 
   258         """
 | 
| 
 | 
   259         Aborts the execution of the script.
 | 
| 
 | 
   260         
 | 
| 
 | 
   261         Returns:
 | 
| 
 | 
   262             None
 | 
| 
 | 
   263         """
 | 
| 
 | 
   264         terminate(str(self))
 | 
| 
 | 
   265 
 | 
| 
 | 
   266     def __str__(self) -> str:
 | 
| 
 | 
   267         """
 | 
| 
 | 
   268         (Private) Returns a string representing the current CustomErr instance.
 | 
| 
 | 
   269 
 | 
| 
 | 
   270         Returns:
 | 
| 
 | 
   271             str: A string representing the current CustomErr instance.
 | 
| 
 | 
   272         """
 | 
| 
 | 
   273         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
 | 
| 
 | 
   274 
 | 
| 
 | 
   275 class ArgsErr(CustomErr):
 | 
| 
 | 
   276     """
 | 
| 
 | 
   277     CustomErr subclass for UI arguments errors.
 | 
| 
 | 
   278     """
 | 
| 
 | 
   279     errName = "Args Error"
 | 
| 
 | 
   280     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   281         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
 | 
| 
 | 
   282 
 | 
| 
 | 
   283 class DataErr(CustomErr):
 | 
| 
 | 
   284     """
 | 
| 
 | 
   285     CustomErr subclass for data formatting errors.
 | 
| 
 | 
   286     """
 | 
| 
 | 
   287     errName = "Data Format Error"
 | 
| 
 | 
   288     def __init__(self, fileName :str, msg = "no further details provided") -> None:
 | 
| 
 | 
   289         super().__init__(f"file \"{fileName}\" contains malformed data", msg)
 | 
| 
 | 
   290 
 | 
| 
 | 
   291 class PathErr(CustomErr):
 | 
| 
 | 
   292     """
 | 
| 
 | 
   293     CustomErr subclass for filepath formatting errors.
 | 
| 
 | 
   294     """
 | 
| 
 | 
   295     errName = "Path Error"
 | 
| 
 | 
   296     def __init__(self, path :FilePath, msg = "no further details provided") -> None:
 | 
| 
 | 
   297         super().__init__(f"path \"{path}\" is invalid", msg)
 | 
| 
 | 
   298 
 | 
| 
 | 
   299 class ValueErr(CustomErr):
 | 
| 
 | 
   300     """
 | 
| 
 | 
   301     CustomErr subclass for any value error.
 | 
| 
 | 
   302     """
 | 
| 
 | 
   303     errName = "Value Error"
 | 
| 
 | 
   304     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   305         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
 | 
| 
 | 
   306 
 | 
| 
 | 
   307 # RESULT
 | 
| 
 | 
   308 T = TypeVar('T')
 | 
| 
 | 
   309 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
 | 
| 
 | 
   310 class Result(Generic[T, E]):
 | 
| 
 | 
   311     class ResultErr(CustomErr):
 | 
| 
 | 
   312         """
 | 
| 
 | 
   313         CustomErr subclass for all Result errors.
 | 
| 
 | 
   314         """
 | 
| 
 | 
   315         errName = "Result Error"
 | 
| 
 | 
   316         def __init__(self, msg = "no further details provided") -> None:
 | 
| 
 | 
   317             super().__init__(msg)
 | 
| 
 | 
   318     """
 | 
| 
 | 
   319     Class to handle the result of an operation, with a value and a boolean flag to indicate
 | 
| 
 | 
   320     whether the operation was successful or not.
 | 
| 
 | 
   321     """
 | 
| 
 | 
   322     def __init__(self, value :Union[T, E], isOk :bool) -> None:
 | 
| 
 | 
   323         """
 | 
| 
 | 
   324         (Private) Initializes an instance of Result.
 | 
| 
 | 
   325 
 | 
| 
 | 
   326         Args:
 | 
| 
 | 
   327             value (Union[T, E]): The value to be stored in the Result instance.
 | 
| 
 | 
   328             isOk (bool): A boolean flag to indicate whether the operation was successful or not.
 | 
| 
 | 
   329         
 | 
| 
 | 
   330             Returns:
 | 
| 
 | 
   331                 None : practically, a Result instance.
 | 
| 
 | 
   332         """
 | 
| 
 | 
   333         self.isOk  = isOk
 | 
| 
 | 
   334         self.isErr = not isOk
 | 
| 
 | 
   335         self.value = value
 | 
| 
 | 
   336 
 | 
| 
 | 
   337     @classmethod
 | 
| 
 | 
   338     def Ok(cls,  value :T) -> "Result":
 | 
| 
 | 
   339         """
 | 
| 
 | 
   340         Constructs a new Result instance with a successful operation.
 | 
| 
 | 
   341 
 | 
| 
 | 
   342         Args:
 | 
| 
 | 
   343             value (T): The value to be stored in the Result instance, set as successful.
 | 
| 
 | 
   344 
 | 
| 
 | 
   345         Returns:
 | 
| 
 | 
   346             Result: A new Result instance with a successful operation.
 | 
| 
 | 
   347         """
 | 
| 
 | 
   348         return Result(value, isOk = True)
 | 
| 
 | 
   349     
 | 
| 
 | 
   350     @classmethod
 | 
| 
 | 
   351     def Err(cls, value :E) -> "Result": 
 | 
| 
 | 
   352         """
 | 
| 
 | 
   353         Constructs a new Result instance with a failed operation.
 | 
| 
 | 
   354 
 | 
| 
 | 
   355         Args:
 | 
| 
 | 
   356             value (E): The value to be stored in the Result instance, set as failed.
 | 
| 
 | 
   357 
 | 
| 
 | 
   358         Returns:
 | 
| 
 | 
   359             Result: A new Result instance with a failed operation.
 | 
| 
 | 
   360         """
 | 
| 
 | 
   361         return Result(value, isOk = False)
 | 
| 
 | 
   362 
 | 
| 
 | 
   363     def unwrap(self) -> T:
 | 
| 
 | 
   364         """
 | 
| 
 | 
   365         Unwraps the value of the Result instance, if the operation was successful.
 | 
| 
 | 
   366 
 | 
| 
 | 
   367         Raises:
 | 
| 
 | 
   368             ResultErr: If the operation was not successful.
 | 
| 
 | 
   369 
 | 
| 
 | 
   370         Returns:
 | 
| 
 | 
   371             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   372         """
 | 
| 
 | 
   373         if self.isOk: return self.value
 | 
| 
 | 
   374         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
 | 
| 
 | 
   375 
 | 
| 
 | 
   376     def unwrapOr(self, default :T) -> T:
 | 
| 
 | 
   377         """
 | 
| 
 | 
   378         Unwraps the value of the Result instance, if the operation was successful, otherwise
 | 
| 
 | 
   379         it returns a default value.
 | 
| 
 | 
   380 
 | 
| 
 | 
   381         Args:
 | 
| 
 | 
   382             default (T): The default value to be returned if the operation was not successful.
 | 
| 
 | 
   383 
 | 
| 
 | 
   384         Returns:
 | 
| 
 | 
   385             T: The value of the Result instance, if the operation was successful,
 | 
| 
 | 
   386             otherwise the default value.
 | 
| 
 | 
   387         """
 | 
| 
 | 
   388         return self.value if self.isOk else default
 | 
| 
 | 
   389     
 | 
| 
 | 
   390     def expect(self, err :"Result.ResultErr") -> T:
 | 
| 
 | 
   391         """
 | 
| 
 | 
   392         Expects that the value of the Result instance is successful, otherwise it raises an error.
 | 
| 
 | 
   393 
 | 
| 
 | 
   394         Args:
 | 
| 
 | 
   395             err (Exception): The error to be raised if the operation was not successful.
 | 
| 
 | 
   396 
 | 
| 
 | 
   397         Raises:
 | 
| 
 | 
   398             err: The error raised if the operation was not successful.
 | 
| 
 | 
   399 
 | 
| 
 | 
   400         Returns:
 | 
| 
 | 
   401             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   402         """
 | 
| 
 | 
   403         if self.isOk: return self.value
 | 
| 
 | 
   404         raise err
 | 
| 
 | 
   405 
 | 
| 
 | 
   406     U = TypeVar("U")
 | 
| 
 | 
   407     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
 | 
| 
 | 
   408         """
 | 
| 
 | 
   409         Maps the value of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   410         If the Result contained an unsuccessful operation to begin with it remains unchanged
 | 
| 
 | 
   411         (a reference to the current instance is returned).
 | 
| 
 | 
   412         If the mapper function panics the returned result instance will be of the error kind.
 | 
| 
 | 
   413 
 | 
| 
 | 
   414         Args:
 | 
| 
 | 
   415             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
 | 
| 
 | 
   416 
 | 
| 
 | 
   417         Returns:
 | 
| 
 | 
   418             Result[U, E]: The result of the mapper operation applied to the Result value.
 | 
| 
 | 
   419         """
 | 
| 
 | 
   420         if self.isErr: return self
 | 
| 
 | 
   421         try: return Result.Ok(mapper(self.value))
 | 
| 
 | 
   422         except Exception as e: return Result.Err(e)
 | 
| 
 | 
   423     
 | 
| 
 | 
   424     D = TypeVar("D", bound = "Result.ResultErr")
 | 
| 
 | 
   425     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
 | 
| 
 | 
   426         """
 | 
| 
 | 
   427         Maps the error of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   428         If the Result contained a successful operation it remains unchanged
 | 
| 
 | 
   429         (a reference to the current instance is returned).
 | 
| 
 | 
   430         If the mapper function panics this method does as well.
 | 
| 
 | 
   431 
 | 
| 
 | 
   432         Args:
 | 
| 
 | 
   433             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
 | 
| 
 | 
   434 
 | 
| 
 | 
   435         Returns:
 | 
| 
 | 
   436             Result[U, E]: The result of the mapper operation applied to the Result error.
 | 
| 
 | 
   437         """
 | 
| 
 | 
   438         if self.isOk: return self
 | 
| 
 | 
   439         return Result.Err(mapper(self.value))
 | 
| 
 | 
   440 
 | 
| 
 | 
   441     def __str__(self):
 | 
| 
 | 
   442         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
 | 
| 
 | 
   443 
 | 
| 
 | 
   444 # FILES
 | 
| 
 | 
   445 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
 | 
| 
 | 
   446     """
 | 
| 
 | 
   447     Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
 | 
| 
 | 
   448 
 | 
| 
 | 
   449     Args:
 | 
| 
 | 
   450         path : the path to the dataset file.
 | 
| 
 | 
   451         datasetName : the name of the dataset.
 | 
| 
 | 
   452 
 | 
| 
 | 
   453     Raises:
 | 
| 
 | 
   454         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
 | 
| 
 | 
   455         it has less than 2 columns.
 | 
| 
 | 
   456     
 | 
| 
 | 
   457     Returns:
 | 
| 
 | 
   458         pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
 | 
| 
 | 
   459     """
 | 
| 
 | 
   460     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
 | 
| 
 | 
   461     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
 | 
| 
 | 
   462     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
 | 
| 
 | 
   463     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
 | 
| 
 | 
   464     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
 | 
| 
 | 
   465     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
 | 
| 
 | 
   466     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
 | 
| 
 | 
   467     except:
 | 
| 
 | 
   468         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
 | 
| 
 | 
   469         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
 | 
| 
 | 
   470     
 | 
| 
 | 
   471     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
 | 
| 
 | 
   472     return dataset
 | 
| 
 | 
   473 
 | 
| 
 | 
   474 def readPickle(path :FilePath) -> Any:
 | 
| 
 | 
   475     """
 | 
| 
 | 
   476     Reads the contents of a .pickle file, which needs to exist at the given path.
 | 
| 
 | 
   477 
 | 
| 
 | 
   478     Args:
 | 
| 
 | 
   479         path : the path to the .pickle file.
 | 
| 
 | 
   480     
 | 
| 
 | 
   481     Returns:
 | 
| 
 | 
   482         Any : the data inside a pickle file, could be anything.
 | 
| 
 | 
   483     """
 | 
| 
 | 
   484     with open(path.show(), "rb") as fd: return pickle.load(fd)
 | 
| 
 | 
   485 
 | 
| 
 | 
   486 def writePickle(path :FilePath, data :Any) -> None:
 | 
| 
 | 
   487     """
 | 
| 
 | 
   488     Saves any data in a .pickle file, created at the given path.
 | 
| 
 | 
   489 
 | 
| 
 | 
   490     Args:
 | 
| 
 | 
   491         path : the path to the .pickle file.
 | 
| 
 | 
   492         data : the data to be written to the file.
 | 
| 
 | 
   493     
 | 
| 
 | 
   494     Returns:
 | 
| 
 | 
   495         None
 | 
| 
 | 
   496     """
 | 
| 
 | 
   497     with open(path.show(), "wb") as fd: pickle.dump(data, fd)
 | 
| 
 | 
   498 
 | 
| 
 | 
   499 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
 | 
| 
 | 
   500     """
 | 
| 
 | 
   501     Reads the contents of a .csv file, which needs to exist at the given path.
 | 
| 
 | 
   502 
 | 
| 
 | 
   503     Args:
 | 
| 
 | 
   504         path : the path to the .csv file.
 | 
| 
 | 
   505         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
 | 
| 
 | 
   506         skipHeader : whether the first row of the file is a header and should be skipped.
 | 
| 
 | 
   507     
 | 
| 
 | 
   508     Returns:
 | 
| 
 | 
   509         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
 | 
| 
 | 
   510     """
 | 
| 
 | 
   511     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
 | 
| 
 | 
   512 
 | 
| 
 | 
   513 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
 | 
| 
 | 
   514     """
 | 
| 
 | 
   515     Reads the contents of a .svg file, which needs to exist at the given path.
 | 
| 
 | 
   516 
 | 
| 
 | 
   517     Args:
 | 
| 
 | 
   518         path : the path to the .svg file.
 | 
| 
 | 
   519     
 | 
| 
 | 
   520     Raises:
 | 
| 
 | 
   521         DataErr : if the map is malformed.
 | 
| 
 | 
   522     
 | 
| 
 | 
   523     Returns:
 | 
| 
 | 
   524         Any : the data inside a svg file, could be anything.
 | 
| 
 | 
   525     """
 | 
| 
 | 
   526     try: return ET.parse(path.show())
 | 
| 
 | 
   527     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
 | 
| 
 | 
   528         raise customErr if customErr else err
 | 
| 
 | 
   529 
 | 
| 
 | 
   530 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
 | 
| 
 | 
   531     """
 | 
| 
 | 
   532     Saves svg data opened with lxml.etree in a .svg file, created at the given path.
 | 
| 
 | 
   533 
 | 
| 
 | 
   534     Args:
 | 
| 
 | 
   535         path : the path to the .svg file.
 | 
| 
 | 
   536         data : the data to be written to the file.
 | 
| 
 | 
   537     
 | 
| 
 | 
   538     Returns:
 | 
| 
 | 
   539         None
 | 
| 
 | 
   540     """
 | 
| 
 | 
   541     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
 | 
| 
 | 
   542 
 | 
| 
 | 
   543 # UI ARGUMENTS
 | 
| 
 | 
   544 class Bool:
 | 
| 
 | 
   545     def __init__(self, argName :str) -> None:
 | 
| 
 | 
   546         self.argName = argName
 | 
| 
 | 
   547 
 | 
| 
 | 
   548     def __call__(self, s :str) -> bool: return self.check(s)
 | 
| 
 | 
   549 
 | 
| 
 | 
   550     def check(self, s :str) -> bool:
 | 
| 
 | 
   551         s = s.lower()
 | 
| 
 | 
   552         if s == "true" : return True
 | 
| 
 | 
   553         if s == "false": return False
 | 
| 
 | 
   554         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   555 
 | 
| 
 | 
   556 class Float:
 | 
| 
 | 
   557     def __init__(self, argName = "Dataset values, not an argument") -> None:
 | 
| 
 | 
   558         self.argName = argName
 | 
| 
 | 
   559     
 | 
| 
 | 
   560     def __call__(self, s :str) -> float: return self.check(s)
 | 
| 
 | 
   561 
 | 
| 
 | 
   562     def check(self, s :str) -> float:
 | 
| 
 | 
   563         try: return float(s)
 | 
| 
 | 
   564         except ValueError:
 | 
| 
 | 
   565             s = s.lower()
 | 
| 
 | 
   566             if s == "nan" or s == "none": return math.nan
 | 
| 
 | 
   567             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   568 
 | 
| 
 | 
   569 # MODELS
 | 
| 
 | 
   570 OldRule = List[Union[str, "OldRule"]]
 | 
| 
 | 
   571 class Model(Enum):
 | 
| 
 | 
   572     """
 | 
| 
 | 
   573     Represents a metabolic model, either custom or locally supported. Custom models don't point
 | 
| 
 | 
   574     to valid file paths.
 | 
| 
 | 
   575     """
 | 
| 
 | 
   576 
 | 
| 
 | 
   577     Recon   = "Recon"
 | 
| 
 | 
   578     ENGRO2  = "ENGRO2"
 | 
| 
 | 
   579     ENGRO2_no_legend = "ENGRO2_no_legend"
 | 
| 
 | 
   580     HMRcore = "HMRcore"
 | 
| 
 | 
   581     HMRcore_no_legend = "HMRcore_no_legend"
 | 
| 
 | 
   582     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
 | 
| 
 | 
   583 
 | 
| 
 | 
   584     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
 | 
| 
 | 
   585         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
 | 
| 
 | 
   586 
 | 
| 
 | 
   587     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
 | 
| 
 | 
   588         """
 | 
| 
 | 
   589         Open "rules" file for this model.
 | 
| 
 | 
   590 
 | 
| 
 | 
   591         Returns:
 | 
| 
 | 
   592             Dict[str, Dict[str, OldRule]] : the rules for this model.
 | 
| 
 | 
   593         """
 | 
| 
 | 
   594         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   595         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   596         return readPickle(path)
 | 
| 
 | 
   597     
 | 
| 
 | 
   598     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
 | 
| 
 | 
   599         """
 | 
| 
 | 
   600         Open "gene translator (old: gene_in_rule)" file for this model.
 | 
| 
 | 
   601 
 | 
| 
 | 
   602         Returns:
 | 
| 
 | 
   603             Dict[str, Dict[str, str]] : the translator dict for this model.
 | 
| 
 | 
   604         """
 | 
| 
 | 
   605         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   606         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   607         return readPickle(path)
 | 
| 
 | 
   608     
 | 
| 
 | 
   609     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
 | 
| 
 | 
   610         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
 | 
| 
 | 
   611         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   612         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
 | 
| 
 | 
   613     
 | 
| 
 | 
   614     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
 | 
| 
 | 
   615         if(self is Model.Custom):
 | 
| 
 | 
   616             return self.load_custom_model(customPath, customExtension)
 | 
| 
 | 
   617         else:
 | 
| 
 | 
   618             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
 | 
| 
 | 
   619         
 | 
| 
 | 
   620     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
 | 
| 
 | 
   621         ext = ext if ext else file_path.ext
 | 
| 
 | 
   622         try:
 | 
| 
 | 
   623             if str(ext) in FileFormat.XML.value:
 | 
| 
 | 
   624                 return cobra.io.read_sbml_model(file_path.show())
 | 
| 
 | 
   625             
 | 
| 
 | 
   626             if str(ext) in FileFormat.JSON.value:
 | 
| 
 | 
   627                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   628                 if(ext == "json"):
 | 
| 
 | 
   629                     return cobra.io.load_json_model(file_path.show())
 | 
| 
 | 
   630                 else: 
 | 
| 
 | 
   631                     return self.extract_model(file_path, ext, "json")
 | 
| 
 | 
   632 
 | 
| 
 | 
   633             if str(ext) in FileFormat.MAT.value:
 | 
| 
 | 
   634                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   635                 if(ext == "mat"):
 | 
| 
 | 
   636                     return cobra.io.load_matlab_model(file_path.show())
 | 
| 
 | 
   637                 else: 
 | 
| 
 | 
   638                     return self.extract_model(file_path, ext, "mat")
 | 
| 
 | 
   639 
 | 
| 
 | 
   640             if str(ext) in FileFormat.YML.value:
 | 
| 
 | 
   641                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   642                 if(ext == "yml"):
 | 
| 
 | 
   643                     return cobra.io.load_yaml_model(file_path.show())
 | 
| 
 | 
   644                 else: 
 | 
| 
 | 
   645                     return self.extract_model(file_path, ext, "yml")
 | 
| 
 | 
   646 
 | 
| 
 | 
   647         except Exception as e: raise DataErr(file_path, e.__str__())
 | 
| 
 | 
   648         raise DataErr(file_path,
 | 
| 
 | 
   649             f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
 | 
| 
 | 
   650     
 | 
| 
 | 
   651 
 | 
| 
 | 
   652     def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
 | 
| 
 | 
   653         """
 | 
| 
 | 
   654         Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
 | 
| 
 | 
   655         
 | 
| 
 | 
   656         Args:
 | 
| 
 | 
   657             file_path: File path of the model
 | 
| 
 | 
   658             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
 | 
| 
 | 
   659             
 | 
| 
 | 
   660         Returns:
 | 
| 
 | 
   661             cobra.Model: COBRApy model 
 | 
| 
 | 
   662             
 | 
| 
 | 
   663         Raises:
 | 
| 
 | 
   664             Exception: Extraction errors
 | 
| 
 | 
   665         """
 | 
| 
 | 
   666         ext_str = str(ext)
 | 
| 
 | 
   667 
 | 
| 
 | 
   668         try:
 | 
| 
 | 
   669             if '.zip' in ext_str:
 | 
| 
 | 
   670                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
 | 
| 
 | 
   671                     with zip_ref.open(zip_ref.namelist()[0]) as json_file:
 | 
| 
 | 
   672                         content = json_file.read().decode('utf-8')
 | 
| 
 | 
   673                         if model_encoding == "json":
 | 
| 
 | 
   674                             return cobra.io.load_json_model(StringIO(content))
 | 
| 
 | 
   675                         elif model_encoding == "mat":
 | 
| 
 | 
   676                             return cobra.io.load_matlab_model(StringIO(content))
 | 
| 
 | 
   677                         elif model_encoding == "yml":
 | 
| 
 | 
   678                             return cobra.io.load_yaml_model(StringIO(content))
 | 
| 
 | 
   679                         else:
 | 
| 
 | 
   680                             raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   681             elif '.gz' in ext_str:
 | 
| 
 | 
   682                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
 | 
| 
 | 
   683                     if model_encoding == "json":
 | 
| 
 | 
   684                         return cobra.io.load_json_model(gz_ref)
 | 
| 
 | 
   685                     elif model_encoding == "mat":
 | 
| 
 | 
   686                         return cobra.io.load_matlab_model(gz_ref)
 | 
| 
 | 
   687                     elif model_encoding == "yml":
 | 
| 
 | 
   688                         return cobra.io.load_yaml_model(gz_ref)
 | 
| 
 | 
   689                     else:
 | 
| 
 | 
   690                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   691             elif '.bz2' in ext_str:
 | 
| 
 | 
   692                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
 | 
| 
 | 
   693                     if model_encoding == "json":
 | 
| 
 | 
   694                         return cobra.io.load_json_model(bz2_ref)
 | 
| 
 | 
   695                     elif model_encoding == "mat":
 | 
| 
 | 
   696                         return cobra.io.load_matlab_model(bz2_ref)
 | 
| 
 | 
   697                     elif model_encoding == "yml":
 | 
| 
 | 
   698                         return cobra.io.load_yaml_model(bz2_ref)
 | 
| 
 | 
   699                     else:
 | 
| 
 | 
   700                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   701             else:
 | 
| 
 | 
   702                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
 | 
| 
 | 
   703             
 | 
| 
 | 
   704         except Exception as e:
 | 
| 
 | 
   705             raise Exception(f"Error during model extraction: {str(e)}")
 | 
| 
 | 
   706         
 | 
| 
 | 
   707 
 | 
| 
 | 
   708 
 | 
| 
394
 | 
   709     def __str__(self) -> str: return self.value
 | 
| 
 | 
   710 
 | 
| 
 | 
   711 
 | 
| 
 | 
   712 def convert_genes(model,annotation):
 | 
| 
 | 
   713     from cobra.manipulation import rename_genes
 | 
| 
 | 
   714     model2=model.copy()
 | 
| 
 | 
   715     try:
 | 
| 
 | 
   716         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
 | 
| 
 | 
   717     except:
 | 
| 
 | 
   718         print("No annotation in gene dict!")
 | 
| 
 | 
   719         return -1
 | 
| 
 | 
   720     rename_genes(model2,dict_genes)
 | 
| 
 | 
   721 
 | 
| 
408
 | 
   722     return model2
 | 
| 
 | 
   723 
 | 
| 
 | 
   724 
 | 
| 
409
 | 
   725 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobra.Model:
 | 
| 
408
 | 
   726     """
 | 
| 
 | 
   727     Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
 | 
| 
 | 
   728     
 | 
| 
 | 
   729     Args:
 | 
| 
 | 
   730         csv_path: Path al file CSV (separato da tab)
 | 
| 
 | 
   731         model_id: ID del modello da creare
 | 
| 
 | 
   732         
 | 
| 
 | 
   733     Returns:
 | 
| 
 | 
   734         cobra.Model: Il modello COBRApy costruito
 | 
| 
 | 
   735     """
 | 
| 
 | 
   736     
 | 
| 
 | 
   737     # Leggi i dati dal CSV
 | 
| 
 | 
   738     df = pd.read_csv(csv_path, sep='\t')
 | 
| 
 | 
   739     
 | 
| 
 | 
   740     # Crea il modello vuoto
 | 
| 
409
 | 
   741     model = cobraModel(model_id)
 | 
| 
408
 | 
   742     
 | 
| 
 | 
   743     # Dict per tenere traccia di metaboliti e compartimenti
 | 
| 
 | 
   744     metabolites_dict = {}
 | 
| 
 | 
   745     compartments_dict = {}
 | 
| 
 | 
   746     
 | 
| 
 | 
   747     print(f"Costruendo modello da {len(df)} reazioni...")
 | 
| 
 | 
   748     
 | 
| 
 | 
   749     # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
 | 
| 
 | 
   750     for idx, row in df.iterrows():
 | 
| 
 | 
   751         reaction_formula = str(row['Reaction']).strip()
 | 
| 
 | 
   752         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   753             continue
 | 
| 
 | 
   754             
 | 
| 
 | 
   755         # Estrai metaboliti dalla formula della reazione
 | 
| 
 | 
   756         metabolites = extract_metabolites_from_reaction(reaction_formula)
 | 
| 
 | 
   757         
 | 
| 
 | 
   758         for met_id in metabolites:
 | 
| 
 | 
   759             compartment = extract_compartment_from_metabolite(met_id)
 | 
| 
 | 
   760             
 | 
| 
 | 
   761             # Aggiungi compartimento se non esiste
 | 
| 
 | 
   762             if compartment not in compartments_dict:
 | 
| 
 | 
   763                 compartments_dict[compartment] = compartment
 | 
| 
 | 
   764             
 | 
| 
 | 
   765             # Aggiungi metabolita se non esiste
 | 
| 
 | 
   766             if met_id not in metabolites_dict:
 | 
| 
 | 
   767                 metabolites_dict[met_id] = Metabolite(
 | 
| 
 | 
   768                     id=met_id,
 | 
| 
 | 
   769                     compartment=compartment,
 | 
| 
 | 
   770                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
 | 
| 
 | 
   771                 )
 | 
| 
 | 
   772     
 | 
| 
 | 
   773     # Aggiungi compartimenti al modello
 | 
| 
 | 
   774     model.compartments = compartments_dict
 | 
| 
 | 
   775     
 | 
| 
 | 
   776     # Aggiungi metaboliti al modello  
 | 
| 
 | 
   777     model.add_metabolites(list(metabolites_dict.values()))
 | 
| 
 | 
   778     
 | 
| 
 | 
   779     print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
 | 
| 
 | 
   780     
 | 
| 
 | 
   781     # Seconda passata: aggiungi le reazioni
 | 
| 
 | 
   782     reactions_added = 0
 | 
| 
 | 
   783     
 | 
| 
 | 
   784     for idx, row in df.iterrows():
 | 
| 
412
 | 
   785         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   786         reaction_formula = str(row['Reaction']).strip()
 | 
| 
 | 
   787         
 | 
| 
 | 
   788         # Salta reazioni senza formula
 | 
| 
 | 
   789         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   790             raise ValueError(f"Formula della reazione mancante {reaction_id}")
 | 
| 
 | 
   791 
 | 
| 
 | 
   792         # Crea la reazione
 | 
| 
 | 
   793         reaction = Reaction(reaction_id)
 | 
| 
 | 
   794         reaction.name = reaction_id
 | 
| 
 | 
   795         
 | 
| 
 | 
   796         # Imposta bounds
 | 
| 
 | 
   797         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
 | 
| 
 | 
   798         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
 | 
| 
 | 
   799         
 | 
| 
 | 
   800         # Aggiungi gene rule se presente
 | 
| 
 | 
   801         if pd.notna(row['Rule']) and str(row['Rule']).strip():
 | 
| 
 | 
   802             reaction.gene_reaction_rule = str(row['Rule']).strip()
 | 
| 
 | 
   803         
 | 
| 
 | 
   804         # Parse della formula della reazione
 | 
| 
408
 | 
   805         try:
 | 
| 
412
 | 
   806             parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
 | 
| 
408
 | 
   807         except Exception as e:
 | 
| 
412
 | 
   808             print(f"Errore nel parsing della reazione {reaction_id}: {e}")
 | 
| 
408
 | 
   809             reactions_skipped += 1
 | 
| 
 | 
   810             continue
 | 
| 
412
 | 
   811         
 | 
| 
 | 
   812         # Aggiungi la reazione al modello
 | 
| 
 | 
   813         model.add_reactions([reaction])
 | 
| 
 | 
   814         reactions_added += 1
 | 
| 
 | 
   815             
 | 
| 
408
 | 
   816     
 | 
| 
 | 
   817     print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
 | 
| 
 | 
   818     
 | 
| 
 | 
   819     # Imposta l'obiettivo di biomassa
 | 
| 
 | 
   820     set_biomass_objective(model)
 | 
| 
 | 
   821     
 | 
| 
 | 
   822     # Imposta il medium
 | 
| 
 | 
   823     set_medium_from_data(model, df)
 | 
| 
 | 
   824     
 | 
| 
 | 
   825     print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
 | 
| 
 | 
   826     
 | 
| 
 | 
   827     return model
 | 
| 
 | 
   828 
 | 
| 
 | 
   829 
 | 
| 
 | 
   830 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 | 
| 
 | 
   831 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   832     """
 | 
| 
 | 
   833     Estrae gli ID dei metaboliti da una formula di reazione.
 | 
| 
 | 
   834     Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
 | 
| 
 | 
   835     e permette che comincino con cifre o underscore.
 | 
| 
 | 
   836     """
 | 
| 
 | 
   837     metabolites = set()
 | 
| 
 | 
   838     # coefficiente opzionale seguito da un token che termina con _<letters>
 | 
| 
 | 
   839     pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
 | 
| 
 | 
   840     matches = re.findall(pattern, reaction_formula)
 | 
| 
 | 
   841     metabolites.update(matches)
 | 
| 
 | 
   842     return metabolites
 | 
| 
 | 
   843 
 | 
| 
 | 
   844 
 | 
| 
 | 
   845 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
 | 
| 
 | 
   846     """
 | 
| 
 | 
   847     Estrae il compartimento dall'ID del metabolita.
 | 
| 
 | 
   848     """
 | 
| 
 | 
   849     # Il compartimento è solitamente l'ultima lettera dopo l'underscore
 | 
| 
 | 
   850     if '_' in metabolite_id:
 | 
| 
 | 
   851         return metabolite_id.split('_')[-1]
 | 
| 
 | 
   852     return 'c'  # default cytoplasm
 | 
| 
 | 
   853 
 | 
| 
 | 
   854 
 | 
| 
 | 
   855 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
 | 
| 
 | 
   856     """
 | 
| 
 | 
   857     Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
 | 
| 
 | 
   858     """
 | 
| 
 | 
   859 
 | 
| 
 | 
   860     if reaction.id == 'EX_thbpt_e':
 | 
| 
 | 
   861         print(reaction.id)
 | 
| 
 | 
   862         print(formula)
 | 
| 
 | 
   863     # Dividi in parte sinistra e destra
 | 
| 
 | 
   864     if '<=>' in formula:
 | 
| 
 | 
   865         left, right = formula.split('<=>')
 | 
| 
 | 
   866         reversible = True
 | 
| 
 | 
   867     elif '<--' in formula:
 | 
| 
 | 
   868         left, right = formula.split('<--')
 | 
| 
 | 
   869         reversible = False
 | 
| 
 | 
   870         left, right = left, right
 | 
| 
 | 
   871     elif '-->' in formula:
 | 
| 
 | 
   872         left, right = formula.split('-->')
 | 
| 
 | 
   873         reversible = False
 | 
| 
 | 
   874     elif '<-' in formula:
 | 
| 
 | 
   875         left, right = formula.split('<-')
 | 
| 
 | 
   876         reversible = False
 | 
| 
 | 
   877         left, right = left, right
 | 
| 
 | 
   878     else:
 | 
| 
 | 
   879         raise ValueError(f"Formato reazione non riconosciuto: {formula}")
 | 
| 
 | 
   880     
 | 
| 
 | 
   881     # Parse dei metaboliti e coefficienti
 | 
| 
 | 
   882     reactants = parse_metabolites_side(left.strip())
 | 
| 
 | 
   883     products = parse_metabolites_side(right.strip())
 | 
| 
 | 
   884     
 | 
| 
 | 
   885     # Aggiungi metaboliti alla reazione
 | 
| 
 | 
   886     metabolites_to_add = {}
 | 
| 
 | 
   887     
 | 
| 
 | 
   888     # Reagenti (coefficienti negativi)
 | 
| 
 | 
   889     for met_id, coeff in reactants.items():
 | 
| 
 | 
   890         if met_id in metabolites_dict:
 | 
| 
 | 
   891             metabolites_to_add[metabolites_dict[met_id]] = -coeff
 | 
| 
 | 
   892     
 | 
| 
 | 
   893     # Prodotti (coefficienti positivi)
 | 
| 
 | 
   894     for met_id, coeff in products.items():
 | 
| 
 | 
   895         if met_id in metabolites_dict:
 | 
| 
 | 
   896             metabolites_to_add[metabolites_dict[met_id]] = coeff
 | 
| 
 | 
   897     
 | 
| 
 | 
   898     reaction.add_metabolites(metabolites_to_add)
 | 
| 
 | 
   899 
 | 
| 
 | 
   900 
 | 
| 
 | 
   901 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
 | 
| 
 | 
   902     """
 | 
| 
 | 
   903     Parsa un lato della reazione per estrarre metaboliti e coefficienti.
 | 
| 
 | 
   904     """
 | 
| 
 | 
   905     metabolites = {}
 | 
| 
 | 
   906     if not side_str or side_str.strip() == '':
 | 
| 
 | 
   907         return metabolites
 | 
| 
 | 
   908 
 | 
| 
 | 
   909     terms = side_str.split('+')
 | 
| 
 | 
   910     for term in terms:
 | 
| 
 | 
   911         term = term.strip()
 | 
| 
 | 
   912         if not term:
 | 
| 
 | 
   913             continue
 | 
| 
 | 
   914 
 | 
| 
 | 
   915         # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
 | 
| 
 | 
   916         match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
 | 
| 
 | 
   917         if match:
 | 
| 
 | 
   918             coeff_str, met_id = match.groups()
 | 
| 
 | 
   919             coeff = float(coeff_str) if coeff_str else 1.0
 | 
| 
 | 
   920             metabolites[met_id] = coeff
 | 
| 
 | 
   921 
 | 
| 
 | 
   922     return metabolites
 | 
| 
 | 
   923 
 | 
| 
 | 
   924 
 | 
| 
 | 
   925 
 | 
| 
 | 
   926 def set_biomass_objective(model: Model):
 | 
| 
 | 
   927     """
 | 
| 
 | 
   928     Imposta la reazione di biomassa come obiettivo.
 | 
| 
 | 
   929     """
 | 
| 
 | 
   930     biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
 | 
| 
 | 
   931     
 | 
| 
 | 
   932     if biomass_reactions:
 | 
| 
 | 
   933         model.objective = biomass_reactions[0].id
 | 
| 
 | 
   934         print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
 | 
| 
 | 
   935     else:
 | 
| 
 | 
   936         print("Nessuna reazione di biomassa trovata")
 | 
| 
 | 
   937 
 | 
| 
 | 
   938 
 | 
| 
 | 
   939 def set_medium_from_data(model: Model, df: pd.DataFrame):
 | 
| 
 | 
   940     """
 | 
| 
 | 
   941     Imposta il medium basato sulla colonna InMedium.
 | 
| 
 | 
   942     """
 | 
| 
 | 
   943     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
 | 
| 
 | 
   944     
 | 
| 
 | 
   945     medium_dict = {}
 | 
| 
 | 
   946     for rxn_id in medium_reactions:
 | 
| 
 | 
   947         if rxn_id in [r.id for r in model.reactions]:
 | 
| 
 | 
   948             reaction = model.reactions.get_by_id(rxn_id)
 | 
| 
 | 
   949             if reaction.lower_bound < 0:  # Solo reazioni di uptake
 | 
| 
 | 
   950                 medium_dict[rxn_id] = abs(reaction.lower_bound)
 | 
| 
 | 
   951     
 | 
| 
 | 
   952     if medium_dict:
 | 
| 
 | 
   953         model.medium = medium_dict
 | 
| 
 | 
   954         print(f"Medium impostato con {len(medium_dict)} componenti")
 | 
| 
 | 
   955 
 | 
| 
 | 
   956 
 | 
| 
 | 
   957 def validate_model(model: Model) -> Dict[str, any]:
 | 
| 
 | 
   958     """
 | 
| 
 | 
   959     Valida il modello e fornisce statistiche di base.
 | 
| 
 | 
   960     """
 | 
| 
 | 
   961     validation = {
 | 
| 
 | 
   962         'num_reactions': len(model.reactions),
 | 
| 
 | 
   963         'num_metabolites': len(model.metabolites),
 | 
| 
 | 
   964         'num_genes': len(model.genes),
 | 
| 
 | 
   965         'num_compartments': len(model.compartments),
 | 
| 
 | 
   966         'objective': str(model.objective),
 | 
| 
 | 
   967         'medium_size': len(model.medium),
 | 
| 
 | 
   968         'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
 | 
| 
 | 
   969         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
 | 
| 
 | 
   970     }
 | 
| 
 | 
   971     
 | 
| 
 | 
   972     try:
 | 
| 
 | 
   973         # Test di crescita
 | 
| 
 | 
   974         solution = model.optimize()
 | 
| 
 | 
   975         validation['growth_rate'] = solution.objective_value
 | 
| 
 | 
   976         validation['status'] = solution.status
 | 
| 
 | 
   977     except Exception as e:
 | 
| 
 | 
   978         validation['growth_rate'] = None
 | 
| 
 | 
   979         validation['status'] = f"Error: {e}"
 | 
| 
 | 
   980     
 | 
| 
 | 
   981     return validation
 | 
| 
411
 | 
   982 
 | 
| 
 | 
   983 
 | 
| 
 | 
   984 ################################- DATA GENERATION -################################
 | 
| 
 | 
   985 ReactionId = str
 | 
| 
 | 
   986 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
 | 
| 
 | 
   987     """
 | 
| 
 | 
   988     Generates a dictionary mapping reaction ids to rules from the model.
 | 
| 
 | 
   989 
 | 
| 
 | 
   990     Args:
 | 
| 
 | 
   991         model : the model to derive data from.
 | 
| 
 | 
   992         asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
 | 
| 
 | 
   993 
 | 
| 
 | 
   994     Returns:
 | 
| 
 | 
   995         Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
 | 
| 
 | 
   996         Dict[ReactionId, str] : the generated dictionary of raw rules.
 | 
| 
 | 
   997     """
 | 
| 
 | 
   998     # Is the below approach convoluted? yes
 | 
| 
 | 
   999     # Ok but is it inefficient? probably
 | 
| 
 | 
  1000     # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
 | 
| 
 | 
  1001     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
 | 
| 
 | 
  1002     ruleExtractor = (lambda reaction :
 | 
| 
 | 
  1003         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
 | 
| 
 | 
  1004 
 | 
| 
 | 
  1005     return {
 | 
| 
 | 
  1006         reaction.id : ruleExtractor(reaction)
 | 
| 
 | 
  1007         for reaction in model.reactions
 | 
| 
 | 
  1008         if reaction.gene_reaction_rule }
 | 
| 
 | 
  1009 
 | 
| 
 | 
  1010 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]:
 | 
| 
 | 
  1011     """
 | 
| 
 | 
  1012     Generates a dictionary mapping reaction ids to reaction formulas from the model.
 | 
| 
 | 
  1013 
 | 
| 
 | 
  1014     Args:
 | 
| 
 | 
  1015         model : the model to derive data from.
 | 
| 
 | 
  1016         asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
 | 
| 
 | 
  1017 
 | 
| 
 | 
  1018     Returns:
 | 
| 
 | 
  1019         Dict[ReactionId, str] : the generated dictionary.
 | 
| 
 | 
  1020     """
 | 
| 
 | 
  1021 
 | 
| 
 | 
  1022     unparsedReactions = {
 | 
| 
 | 
  1023         reaction.id : reaction.reaction
 | 
| 
 | 
  1024         for reaction in model.reactions
 | 
| 
 | 
  1025         if reaction.reaction 
 | 
| 
 | 
  1026     }
 | 
| 
 | 
  1027 
 | 
| 
 | 
  1028     if not asParsed: return unparsedReactions
 | 
| 
 | 
  1029     
 | 
| 
 | 
  1030     return reactionUtils.create_reaction_dict(unparsedReactions)
 | 
| 
 | 
  1031 
 | 
| 
 | 
  1032 def get_medium(model:cobra.Model) -> pd.DataFrame:
 | 
| 
 | 
  1033     trueMedium=[]
 | 
| 
 | 
  1034     for r in model.reactions:
 | 
| 
 | 
  1035         positiveCoeff=0
 | 
| 
 | 
  1036         for m in r.metabolites:
 | 
| 
 | 
  1037             if r.get_coefficient(m.id)>0:
 | 
| 
 | 
  1038                 positiveCoeff=1;
 | 
| 
 | 
  1039         if (positiveCoeff==0 and r.lower_bound<0):
 | 
| 
 | 
  1040             trueMedium.append(r.id)
 | 
| 
 | 
  1041 
 | 
| 
 | 
  1042     df_medium = pd.DataFrame()
 | 
| 
 | 
  1043     df_medium["reaction"] = trueMedium
 | 
| 
 | 
  1044     return df_medium
 | 
| 
 | 
  1045 
 | 
| 
 | 
  1046 def generate_bounds(model:cobra.Model) -> pd.DataFrame:
 | 
| 
 | 
  1047 
 | 
| 
 | 
  1048     rxns = []
 | 
| 
 | 
  1049     for reaction in model.reactions:
 | 
| 
 | 
  1050         rxns.append(reaction.id)
 | 
| 
 | 
  1051 
 | 
| 
 | 
  1052     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
 | 
| 
 | 
  1053 
 | 
| 
 | 
  1054     for reaction in model.reactions:
 | 
| 
 | 
  1055         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
 | 
| 
 | 
  1056     return bounds
 | 
| 
 | 
  1057 
 | 
| 
 | 
  1058 
 | 
| 
 | 
  1059 
 | 
| 
 | 
  1060 def generate_compartments(model: cobra.Model) -> pd.DataFrame:
 | 
| 
 | 
  1061     """
 | 
| 
 | 
  1062     Generates a DataFrame containing compartment information for each reaction.
 | 
| 
 | 
  1063     Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
 | 
| 
 | 
  1064     
 | 
| 
 | 
  1065     Args:
 | 
| 
 | 
  1066         model: the COBRA model to extract compartment data from.
 | 
| 
 | 
  1067         
 | 
| 
 | 
  1068     Returns:
 | 
| 
 | 
  1069         pd.DataFrame: DataFrame with ReactionID and compartment columns
 | 
| 
 | 
  1070     """
 | 
| 
 | 
  1071     pathway_data = []
 | 
| 
 | 
  1072 
 | 
| 
 | 
  1073     # First pass: determine the maximum number of pathways any reaction has
 | 
| 
 | 
  1074     max_pathways = 0
 | 
| 
 | 
  1075     reaction_pathways = {}
 | 
| 
 | 
  1076 
 | 
| 
 | 
  1077     for reaction in model.reactions:
 | 
| 
 | 
  1078         # Get unique pathways from all metabolites in the reaction
 | 
| 
 | 
  1079         if type(reaction.annotation['pathways']) == list:
 | 
| 
 | 
  1080             reaction_pathways[reaction.id] = reaction.annotation['pathways']
 | 
| 
 | 
  1081             max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
 | 
| 
 | 
  1082         else:
 | 
| 
 | 
  1083             reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
 | 
| 
 | 
  1084 
 | 
| 
 | 
  1085     # Create column names for pathways
 | 
| 
 | 
  1086     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
 | 
| 
 | 
  1087 
 | 
| 
 | 
  1088     # Second pass: create the data
 | 
| 
 | 
  1089     for reaction_id, pathways in reaction_pathways.items():
 | 
| 
 | 
  1090         row = {"ReactionID": reaction_id}
 | 
| 
 | 
  1091         
 | 
| 
 | 
  1092         # Fill pathway columns
 | 
| 
 | 
  1093         for i in range(max_pathways):
 | 
| 
 | 
  1094             col_name = pathway_columns[i]
 | 
| 
 | 
  1095             if i < len(pathways):
 | 
| 
 | 
  1096                 row[col_name] = pathways[i]
 | 
| 
 | 
  1097             else:
 | 
| 
 | 
  1098                 row[col_name] = None  # or "" if you prefer empty strings
 | 
| 
 | 
  1099 
 | 
| 
 | 
  1100         pathway_data.append(row)
 | 
| 
 | 
  1101 
 | 
| 
 | 
  1102     return pd.DataFrame(pathway_data) |