| 
335
 | 
     1 import math
 | 
| 
 | 
     2 import re
 | 
| 
 | 
     3 import sys
 | 
| 
 | 
     4 import csv
 | 
| 
 | 
     5 import pickle
 | 
| 
 | 
     6 import lxml.etree as ET
 | 
| 
 | 
     7 
 | 
| 
 | 
     8 from enum import Enum
 | 
| 
 | 
     9 from itertools import count
 | 
| 
 | 
    10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union
 | 
| 
 | 
    11 
 | 
| 
 | 
    12 import pandas as pd
 | 
| 
 | 
    13 import cobra
 | 
| 
 | 
    14 
 | 
| 
 | 
    15 import zipfile
 | 
| 
 | 
    16 import gzip
 | 
| 
 | 
    17 import bz2
 | 
| 
 | 
    18 from io import StringIO
 | 
| 
 | 
    19 
 | 
| 
339
 | 
    20 class ValueErr(Exception):
 | 
| 
 | 
    21     def __init__(self, param_name, expected, actual):
 | 
| 
 | 
    22         super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
 | 
| 
 | 
    23 
 | 
| 
 | 
    24 class PathErr(Exception):
 | 
| 
 | 
    25     def __init__(self, path, message):
 | 
| 
 | 
    26         super().__init__(f"Path error for '{path}': {message}")
 | 
| 
 | 
    27 
 | 
| 
335
 | 
    28 class FileFormat(Enum):
 | 
| 
 | 
    29     """
 | 
| 
 | 
    30     Encodes possible file extensions to conditionally save data in a different format.
 | 
| 
 | 
    31     """
 | 
| 
 | 
    32     DAT    = ("dat",) # this is how galaxy treats all your files!
 | 
| 
 | 
    33     CSV    = ("csv",) # this is how most editable input data is written
 | 
| 
339
 | 
    34     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
 | 
| 
335
 | 
    35     SVG    = ("svg",) # this is how most metabolic maps are written
 | 
| 
 | 
    36     PNG    = ("png",) # this is a common output format for images (such as metabolic maps)
 | 
| 
 | 
    37     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications.
 | 
| 
339
 | 
    38     
 | 
| 
 | 
    39     # Updated to include compressed variants
 | 
| 
 | 
    40     XML    = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
 | 
| 
 | 
    41     JSON   = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
 | 
| 
 | 
    42     
 | 
| 
335
 | 
    43     TXT = ("txt",) # this is how most output data is written
 | 
| 
 | 
    44     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
 | 
| 
 | 
    45 
 | 
| 
339
 | 
    46     def __init__(self, *extensions):
 | 
| 
 | 
    47         self.extensions = extensions
 | 
| 
 | 
    48         # Store original extension when set via fromExt
 | 
| 
 | 
    49         self._original_extension = None
 | 
| 
 | 
    50 
 | 
| 
335
 | 
    51     @classmethod
 | 
| 
339
 | 
    52     def fromExt(cls, ext: str) -> "FileFormat":
 | 
| 
335
 | 
    53         """
 | 
| 
 | 
    54         Converts a file extension string to a FileFormat instance.
 | 
| 
 | 
    55         Args:
 | 
| 
 | 
    56             ext : The file extension as a string.
 | 
| 
 | 
    57         Returns:
 | 
| 
 | 
    58             FileFormat: The FileFormat instance corresponding to the file extension.
 | 
| 
 | 
    59         """
 | 
| 
 | 
    60         variantName = ext.upper()
 | 
| 
 | 
    61         if variantName in FileFormat.__members__: 
 | 
| 
 | 
    62             instance = FileFormat[variantName]
 | 
| 
339
 | 
    63             instance._original_extension = ext
 | 
| 
335
 | 
    64             return instance
 | 
| 
 | 
    65         
 | 
| 
339
 | 
    66         variantName = ext.lower()
 | 
| 
335
 | 
    67         for member in cls:
 | 
| 
 | 
    68             if variantName in member.value: 
 | 
| 
339
 | 
    69                 # Create a copy-like behavior by storing the original extension
 | 
| 
 | 
    70                 member._original_extension = ext
 | 
| 
335
 | 
    71                 return member
 | 
| 
 | 
    72         
 | 
| 
 | 
    73         raise ValueErr("ext", "a valid FileFormat file extension", ext)
 | 
| 
 | 
    74 
 | 
| 
 | 
    75     def __str__(self) -> str:
 | 
| 
 | 
    76         """
 | 
| 
 | 
    77         (Private) converts to str representation. Good practice for usage with argparse.
 | 
| 
 | 
    78         Returns:
 | 
| 
 | 
    79             str : the string representation of the file extension.
 | 
| 
 | 
    80         """
 | 
| 
339
 | 
    81         # If we have an original extension stored (for compressed files), use it
 | 
| 
 | 
    82         if hasattr(self, '_original_extension') and self._original_extension:
 | 
| 
 | 
    83             return self._original_extension
 | 
| 
 | 
    84         
 | 
| 
340
 | 
    85         # For XML and JSON without original extension, use the base extension
 | 
| 
 | 
    86         if self == FileFormat.XML:
 | 
| 
 | 
    87             return "xml"
 | 
| 
 | 
    88         elif self == FileFormat.JSON:
 | 
| 
 | 
    89             return "json"
 | 
| 
 | 
    90         
 | 
| 
339
 | 
    91         return self.value[-1]
 | 
| 
335
 | 
    92 
 | 
| 
 | 
    93 class FilePath():
 | 
| 
 | 
    94     """
 | 
| 
 | 
    95     Represents a file path. View this as an attempt to standardize file-related operations by expecting
 | 
| 
 | 
    96     values of this type in any process requesting a file path.
 | 
| 
 | 
    97     """
 | 
| 
339
 | 
    98     def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
 | 
| 
335
 | 
    99         """
 | 
| 
 | 
   100         (Private) Initializes an instance of FilePath.
 | 
| 
 | 
   101         Args:
 | 
| 
 | 
   102             path : the end of the path, containing the file name.
 | 
| 
 | 
   103             ext : the file's extension.
 | 
| 
 | 
   104             prefix : anything before path, if the last '/' isn't there it's added by the code.
 | 
| 
 | 
   105         Returns:
 | 
| 
 | 
   106             None : practically, a FilePath instance.
 | 
| 
 | 
   107         """
 | 
| 
339
 | 
   108         self.ext = ext
 | 
| 
335
 | 
   109         self.filePath = filePath
 | 
| 
 | 
   110 
 | 
| 
339
 | 
   111         if prefix and prefix[-1] != '/': 
 | 
| 
 | 
   112             prefix += '/'
 | 
| 
335
 | 
   113         self.prefix = prefix
 | 
| 
 | 
   114     
 | 
| 
 | 
   115     @classmethod
 | 
| 
339
 | 
   116     def fromStrPath(cls, path: str) -> "FilePath":
 | 
| 
335
 | 
   117         """
 | 
| 
 | 
   118         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
 | 
| 
 | 
   119         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
 | 
| 
 | 
   120         These double extensions are not supported for other file types such as .csv.
 | 
| 
 | 
   121         Args:
 | 
| 
 | 
   122             path : the string containing the path
 | 
| 
 | 
   123         Raises:
 | 
| 
 | 
   124             PathErr : if the provided string doesn't represent a valid path.
 | 
| 
 | 
   125         Returns:
 | 
| 
 | 
   126             FilePath : the constructed instance.
 | 
| 
 | 
   127         """
 | 
| 
 | 
   128         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
 | 
| 
 | 
   129         if not result or not result["name"] or not result["ext"]:
 | 
| 
 | 
   130             raise PathErr(path, "cannot recognize folder structure or extension in path")
 | 
| 
 | 
   131 
 | 
| 
 | 
   132         prefix = result["prefix"] if result["prefix"] else ""
 | 
| 
 | 
   133         name, ext = result["name"], result["ext"]
 | 
| 
 | 
   134 
 | 
| 
339
 | 
   135         # Check for double extensions (json.gz, xml.zip, etc.)
 | 
| 
335
 | 
   136         parts = path.split(".")
 | 
| 
 | 
   137         if len(parts) >= 3:  
 | 
| 
 | 
   138             penultimate = parts[-2]
 | 
| 
 | 
   139             last = parts[-1]
 | 
| 
339
 | 
   140             double_ext = f"{penultimate}.{last}"
 | 
| 
 | 
   141             
 | 
| 
 | 
   142             # Try the double extension first
 | 
| 
 | 
   143             try:
 | 
| 
 | 
   144                 ext_format = FileFormat.fromExt(double_ext)
 | 
| 
335
 | 
   145                 name = ".".join(parts[:-2])
 | 
| 
339
 | 
   146                 # Extract prefix if it exists
 | 
| 
 | 
   147                 if '/' in name:
 | 
| 
 | 
   148                     prefix = name[:name.rfind('/') + 1]
 | 
| 
 | 
   149                     name = name[name.rfind('/') + 1:]
 | 
| 
 | 
   150                 return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   151             except ValueErr:
 | 
| 
 | 
   152                 # If double extension doesn't work, fall back to single extension
 | 
| 
 | 
   153                 pass
 | 
| 
335
 | 
   154 
 | 
| 
339
 | 
   155         # Single extension fallback (original logic)
 | 
| 
 | 
   156         try:
 | 
| 
 | 
   157             ext_format = FileFormat.fromExt(ext)
 | 
| 
 | 
   158             return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   159         except ValueErr:
 | 
| 
 | 
   160             raise PathErr(path, f"unsupported file extension: {ext}")
 | 
| 
335
 | 
   161 
 | 
| 
 | 
   162     def show(self) -> str:
 | 
| 
 | 
   163         """
 | 
| 
 | 
   164         Shows the path as a string.
 | 
| 
 | 
   165         Returns:
 | 
| 
 | 
   166             str : the path shown as a string.
 | 
| 
 | 
   167         """
 | 
| 
 | 
   168         return f"{self.prefix}{self.filePath}.{self.ext}"
 | 
| 
 | 
   169     
 | 
| 
339
 | 
   170     def __str__(self) -> str: 
 | 
| 
 | 
   171         return self.show()
 | 
| 
335
 | 
   172 
 | 
| 
 | 
   173 # ERRORS
 | 
| 
 | 
   174 def terminate(msg :str) -> None:
 | 
| 
 | 
   175     """
 | 
| 
 | 
   176     Terminate the execution of the script with an error message.
 | 
| 
 | 
   177     
 | 
| 
 | 
   178     Args:
 | 
| 
 | 
   179         msg (str): The error message to be displayed.
 | 
| 
 | 
   180     
 | 
| 
 | 
   181     Returns:
 | 
| 
 | 
   182         None
 | 
| 
 | 
   183     """
 | 
| 
 | 
   184     sys.exit(f"Execution aborted: {msg}\n")
 | 
| 
 | 
   185 
 | 
| 
 | 
   186 def logWarning(msg :str, loggerPath :str) -> None:
 | 
| 
 | 
   187     """
 | 
| 
 | 
   188     Log a warning message to an output log file and print it to the console. The final period and a
 | 
| 
 | 
   189     newline is added by the function.
 | 
| 
 | 
   190 
 | 
| 
 | 
   191     Args:
 | 
| 
 | 
   192         s (str): The warning message to be logged and printed.
 | 
| 
 | 
   193         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
 | 
| 
 | 
   194         immediately read back (beware relative expensive operation, log with caution).
 | 
| 
 | 
   195 
 | 
| 
 | 
   196     Returns:
 | 
| 
 | 
   197         None
 | 
| 
 | 
   198     """
 | 
| 
 | 
   199     # building the path and then reading it immediately seems useless, but it's actually a way of
 | 
| 
 | 
   200     # validating that reduces repetition on the caller's side. Besides, logging a message by writing
 | 
| 
 | 
   201     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
 | 
| 
 | 
   202     # mindlessly logging whenever something comes up, log at the very end and tell the user everything
 | 
| 
 | 
   203     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
 | 
| 
 | 
   204     # the file only at the end of the program's execution.
 | 
| 
 | 
   205     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
 | 
| 
 | 
   206 
 | 
| 
 | 
   207 class CustomErr(Exception):
 | 
| 
 | 
   208     """
 | 
| 
 | 
   209     Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
 | 
| 
 | 
   210     """
 | 
| 
 | 
   211     __idGenerator = count()
 | 
| 
 | 
   212     errName = "Custom Error"
 | 
| 
 | 
   213     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
 | 
| 
 | 
   214         """
 | 
| 
 | 
   215         (Private) Initializes an instance of CustomErr.
 | 
| 
 | 
   216 
 | 
| 
 | 
   217         Args:
 | 
| 
 | 
   218             msg (str): Error message to be displayed.
 | 
| 
 | 
   219             details (str): Informs the user more about the error encountered. Defaults to "".
 | 
| 
 | 
   220             explicitErrCode (int): Explicit error code to be used. Defaults to -1.
 | 
| 
 | 
   221         
 | 
| 
 | 
   222         Returns:
 | 
| 
 | 
   223             None : practically, a CustomErr instance.
 | 
| 
 | 
   224         """
 | 
| 
 | 
   225         self.msg     = msg
 | 
| 
 | 
   226         self.details = details
 | 
| 
 | 
   227 
 | 
| 
 | 
   228         self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
 | 
| 
 | 
   229 
 | 
| 
 | 
   230     def throw(self, loggerPath = "") -> None:
 | 
| 
 | 
   231         """
 | 
| 
 | 
   232         Raises the current CustomErr instance, logging a warning message before doing so.
 | 
| 
 | 
   233 
 | 
| 
 | 
   234         Raises:
 | 
| 
 | 
   235             self: The current CustomErr instance.
 | 
| 
 | 
   236         
 | 
| 
 | 
   237         Returns:
 | 
| 
 | 
   238             None
 | 
| 
 | 
   239         """
 | 
| 
 | 
   240         if loggerPath: logWarning(str(self), loggerPath)
 | 
| 
 | 
   241         raise self
 | 
| 
 | 
   242 
 | 
| 
 | 
   243     def abort(self) -> None:
 | 
| 
 | 
   244         """
 | 
| 
 | 
   245         Aborts the execution of the script.
 | 
| 
 | 
   246         
 | 
| 
 | 
   247         Returns:
 | 
| 
 | 
   248             None
 | 
| 
 | 
   249         """
 | 
| 
 | 
   250         terminate(str(self))
 | 
| 
 | 
   251 
 | 
| 
 | 
   252     def __str__(self) -> str:
 | 
| 
 | 
   253         """
 | 
| 
 | 
   254         (Private) Returns a string representing the current CustomErr instance.
 | 
| 
 | 
   255 
 | 
| 
 | 
   256         Returns:
 | 
| 
 | 
   257             str: A string representing the current CustomErr instance.
 | 
| 
 | 
   258         """
 | 
| 
 | 
   259         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
 | 
| 
 | 
   260 
 | 
| 
 | 
   261 class ArgsErr(CustomErr):
 | 
| 
 | 
   262     """
 | 
| 
 | 
   263     CustomErr subclass for UI arguments errors.
 | 
| 
 | 
   264     """
 | 
| 
 | 
   265     errName = "Args Error"
 | 
| 
 | 
   266     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   267         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
 | 
| 
 | 
   268 
 | 
| 
 | 
   269 class DataErr(CustomErr):
 | 
| 
 | 
   270     """
 | 
| 
 | 
   271     CustomErr subclass for data formatting errors.
 | 
| 
 | 
   272     """
 | 
| 
 | 
   273     errName = "Data Format Error"
 | 
| 
 | 
   274     def __init__(self, fileName :str, msg = "no further details provided") -> None:
 | 
| 
 | 
   275         super().__init__(f"file \"{fileName}\" contains malformed data", msg)
 | 
| 
 | 
   276 
 | 
| 
 | 
   277 class PathErr(CustomErr):
 | 
| 
 | 
   278     """
 | 
| 
 | 
   279     CustomErr subclass for filepath formatting errors.
 | 
| 
 | 
   280     """
 | 
| 
 | 
   281     errName = "Path Error"
 | 
| 
 | 
   282     def __init__(self, path :FilePath, msg = "no further details provided") -> None:
 | 
| 
 | 
   283         super().__init__(f"path \"{path}\" is invalid", msg)
 | 
| 
 | 
   284 
 | 
| 
 | 
   285 class ValueErr(CustomErr):
 | 
| 
 | 
   286     """
 | 
| 
 | 
   287     CustomErr subclass for any value error.
 | 
| 
 | 
   288     """
 | 
| 
 | 
   289     errName = "Value Error"
 | 
| 
 | 
   290     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   291         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
 | 
| 
 | 
   292 
 | 
| 
 | 
   293 # RESULT
 | 
| 
 | 
   294 T = TypeVar('T')
 | 
| 
 | 
   295 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
 | 
| 
 | 
   296 class Result(Generic[T, E]):
 | 
| 
 | 
   297     class ResultErr(CustomErr):
 | 
| 
 | 
   298         """
 | 
| 
 | 
   299         CustomErr subclass for all Result errors.
 | 
| 
 | 
   300         """
 | 
| 
 | 
   301         errName = "Result Error"
 | 
| 
 | 
   302         def __init__(self, msg = "no further details provided") -> None:
 | 
| 
 | 
   303             super().__init__(msg)
 | 
| 
 | 
   304     """
 | 
| 
 | 
   305     Class to handle the result of an operation, with a value and a boolean flag to indicate
 | 
| 
 | 
   306     whether the operation was successful or not.
 | 
| 
 | 
   307     """
 | 
| 
 | 
   308     def __init__(self, value :Union[T, E], isOk :bool) -> None:
 | 
| 
 | 
   309         """
 | 
| 
 | 
   310         (Private) Initializes an instance of Result.
 | 
| 
 | 
   311 
 | 
| 
 | 
   312         Args:
 | 
| 
 | 
   313             value (Union[T, E]): The value to be stored in the Result instance.
 | 
| 
 | 
   314             isOk (bool): A boolean flag to indicate whether the operation was successful or not.
 | 
| 
 | 
   315         
 | 
| 
 | 
   316             Returns:
 | 
| 
 | 
   317                 None : practically, a Result instance.
 | 
| 
 | 
   318         """
 | 
| 
 | 
   319         self.isOk  = isOk
 | 
| 
 | 
   320         self.isErr = not isOk
 | 
| 
 | 
   321         self.value = value
 | 
| 
 | 
   322 
 | 
| 
 | 
   323     @classmethod
 | 
| 
 | 
   324     def Ok(cls,  value :T) -> "Result":
 | 
| 
 | 
   325         """
 | 
| 
 | 
   326         Constructs a new Result instance with a successful operation.
 | 
| 
 | 
   327 
 | 
| 
 | 
   328         Args:
 | 
| 
 | 
   329             value (T): The value to be stored in the Result instance, set as successful.
 | 
| 
 | 
   330 
 | 
| 
 | 
   331         Returns:
 | 
| 
 | 
   332             Result: A new Result instance with a successful operation.
 | 
| 
 | 
   333         """
 | 
| 
 | 
   334         return Result(value, isOk = True)
 | 
| 
 | 
   335     
 | 
| 
 | 
   336     @classmethod
 | 
| 
 | 
   337     def Err(cls, value :E) -> "Result": 
 | 
| 
 | 
   338         """
 | 
| 
 | 
   339         Constructs a new Result instance with a failed operation.
 | 
| 
 | 
   340 
 | 
| 
 | 
   341         Args:
 | 
| 
 | 
   342             value (E): The value to be stored in the Result instance, set as failed.
 | 
| 
 | 
   343 
 | 
| 
 | 
   344         Returns:
 | 
| 
 | 
   345             Result: A new Result instance with a failed operation.
 | 
| 
 | 
   346         """
 | 
| 
 | 
   347         return Result(value, isOk = False)
 | 
| 
 | 
   348 
 | 
| 
 | 
   349     def unwrap(self) -> T:
 | 
| 
 | 
   350         """
 | 
| 
 | 
   351         Unwraps the value of the Result instance, if the operation was successful.
 | 
| 
 | 
   352 
 | 
| 
 | 
   353         Raises:
 | 
| 
 | 
   354             ResultErr: If the operation was not successful.
 | 
| 
 | 
   355 
 | 
| 
 | 
   356         Returns:
 | 
| 
 | 
   357             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   358         """
 | 
| 
 | 
   359         if self.isOk: return self.value
 | 
| 
 | 
   360         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
 | 
| 
 | 
   361 
 | 
| 
 | 
   362     def unwrapOr(self, default :T) -> T:
 | 
| 
 | 
   363         """
 | 
| 
 | 
   364         Unwraps the value of the Result instance, if the operation was successful, otherwise
 | 
| 
 | 
   365         it returns a default value.
 | 
| 
 | 
   366 
 | 
| 
 | 
   367         Args:
 | 
| 
 | 
   368             default (T): The default value to be returned if the operation was not successful.
 | 
| 
 | 
   369 
 | 
| 
 | 
   370         Returns:
 | 
| 
 | 
   371             T: The value of the Result instance, if the operation was successful,
 | 
| 
 | 
   372             otherwise the default value.
 | 
| 
 | 
   373         """
 | 
| 
 | 
   374         return self.value if self.isOk else default
 | 
| 
 | 
   375     
 | 
| 
 | 
   376     def expect(self, err :"Result.ResultErr") -> T:
 | 
| 
 | 
   377         """
 | 
| 
 | 
   378         Expects that the value of the Result instance is successful, otherwise it raises an error.
 | 
| 
 | 
   379 
 | 
| 
 | 
   380         Args:
 | 
| 
 | 
   381             err (Exception): The error to be raised if the operation was not successful.
 | 
| 
 | 
   382 
 | 
| 
 | 
   383         Raises:
 | 
| 
 | 
   384             err: The error raised if the operation was not successful.
 | 
| 
 | 
   385 
 | 
| 
 | 
   386         Returns:
 | 
| 
 | 
   387             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   388         """
 | 
| 
 | 
   389         if self.isOk: return self.value
 | 
| 
 | 
   390         raise err
 | 
| 
 | 
   391 
 | 
| 
 | 
   392     U = TypeVar("U")
 | 
| 
 | 
   393     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
 | 
| 
 | 
   394         """
 | 
| 
 | 
   395         Maps the value of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   396         If the Result contained an unsuccessful operation to begin with it remains unchanged
 | 
| 
 | 
   397         (a reference to the current instance is returned).
 | 
| 
 | 
   398         If the mapper function panics the returned result instance will be of the error kind.
 | 
| 
 | 
   399 
 | 
| 
 | 
   400         Args:
 | 
| 
 | 
   401             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
 | 
| 
 | 
   402 
 | 
| 
 | 
   403         Returns:
 | 
| 
 | 
   404             Result[U, E]: The result of the mapper operation applied to the Result value.
 | 
| 
 | 
   405         """
 | 
| 
 | 
   406         if self.isErr: return self
 | 
| 
 | 
   407         try: return Result.Ok(mapper(self.value))
 | 
| 
 | 
   408         except Exception as e: return Result.Err(e)
 | 
| 
 | 
   409     
 | 
| 
 | 
   410     D = TypeVar("D", bound = "Result.ResultErr")
 | 
| 
 | 
   411     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
 | 
| 
 | 
   412         """
 | 
| 
 | 
   413         Maps the error of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   414         If the Result contained a successful operation it remains unchanged
 | 
| 
 | 
   415         (a reference to the current instance is returned).
 | 
| 
 | 
   416         If the mapper function panics this method does as well.
 | 
| 
 | 
   417 
 | 
| 
 | 
   418         Args:
 | 
| 
 | 
   419             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
 | 
| 
 | 
   420 
 | 
| 
 | 
   421         Returns:
 | 
| 
 | 
   422             Result[U, E]: The result of the mapper operation applied to the Result error.
 | 
| 
 | 
   423         """
 | 
| 
 | 
   424         if self.isOk: return self
 | 
| 
 | 
   425         return Result.Err(mapper(self.value))
 | 
| 
 | 
   426 
 | 
| 
 | 
   427     def __str__(self):
 | 
| 
 | 
   428         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
 | 
| 
 | 
   429 
 | 
| 
 | 
   430 # FILES
 | 
| 
 | 
   431 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
 | 
| 
 | 
   432     """
 | 
| 
 | 
   433     Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
 | 
| 
 | 
   434 
 | 
| 
 | 
   435     Args:
 | 
| 
 | 
   436         path : the path to the dataset file.
 | 
| 
 | 
   437         datasetName : the name of the dataset.
 | 
| 
 | 
   438 
 | 
| 
 | 
   439     Raises:
 | 
| 
 | 
   440         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
 | 
| 
 | 
   441         it has less than 2 columns.
 | 
| 
 | 
   442     
 | 
| 
 | 
   443     Returns:
 | 
| 
 | 
   444         pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
 | 
| 
 | 
   445     """
 | 
| 
 | 
   446     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
 | 
| 
 | 
   447     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
 | 
| 
 | 
   448     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
 | 
| 
 | 
   449     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
 | 
| 
 | 
   450     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
 | 
| 
 | 
   451     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
 | 
| 
 | 
   452     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
 | 
| 
 | 
   453     except:
 | 
| 
 | 
   454         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
 | 
| 
 | 
   455         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
 | 
| 
 | 
   456     
 | 
| 
 | 
   457     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
 | 
| 
 | 
   458     return dataset
 | 
| 
 | 
   459 
 | 
| 
 | 
   460 def readPickle(path :FilePath) -> Any:
 | 
| 
 | 
   461     """
 | 
| 
 | 
   462     Reads the contents of a .pickle file, which needs to exist at the given path.
 | 
| 
 | 
   463 
 | 
| 
 | 
   464     Args:
 | 
| 
 | 
   465         path : the path to the .pickle file.
 | 
| 
 | 
   466     
 | 
| 
 | 
   467     Returns:
 | 
| 
 | 
   468         Any : the data inside a pickle file, could be anything.
 | 
| 
 | 
   469     """
 | 
| 
 | 
   470     with open(path.show(), "rb") as fd: return pickle.load(fd)
 | 
| 
 | 
   471 
 | 
| 
 | 
   472 def writePickle(path :FilePath, data :Any) -> None:
 | 
| 
 | 
   473     """
 | 
| 
 | 
   474     Saves any data in a .pickle file, created at the given path.
 | 
| 
 | 
   475 
 | 
| 
 | 
   476     Args:
 | 
| 
 | 
   477         path : the path to the .pickle file.
 | 
| 
 | 
   478         data : the data to be written to the file.
 | 
| 
 | 
   479     
 | 
| 
 | 
   480     Returns:
 | 
| 
 | 
   481         None
 | 
| 
 | 
   482     """
 | 
| 
 | 
   483     with open(path.show(), "wb") as fd: pickle.dump(data, fd)
 | 
| 
 | 
   484 
 | 
| 
 | 
   485 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
 | 
| 
 | 
   486     """
 | 
| 
 | 
   487     Reads the contents of a .csv file, which needs to exist at the given path.
 | 
| 
 | 
   488 
 | 
| 
 | 
   489     Args:
 | 
| 
 | 
   490         path : the path to the .csv file.
 | 
| 
 | 
   491         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
 | 
| 
 | 
   492         skipHeader : whether the first row of the file is a header and should be skipped.
 | 
| 
 | 
   493     
 | 
| 
 | 
   494     Returns:
 | 
| 
 | 
   495         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
 | 
| 
 | 
   496     """
 | 
| 
 | 
   497     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
 | 
| 
 | 
   498 
 | 
| 
 | 
   499 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
 | 
| 
 | 
   500     """
 | 
| 
 | 
   501     Reads the contents of a .svg file, which needs to exist at the given path.
 | 
| 
 | 
   502 
 | 
| 
 | 
   503     Args:
 | 
| 
 | 
   504         path : the path to the .svg file.
 | 
| 
 | 
   505     
 | 
| 
 | 
   506     Raises:
 | 
| 
 | 
   507         DataErr : if the map is malformed.
 | 
| 
 | 
   508     
 | 
| 
 | 
   509     Returns:
 | 
| 
 | 
   510         Any : the data inside a svg file, could be anything.
 | 
| 
 | 
   511     """
 | 
| 
 | 
   512     try: return ET.parse(path.show())
 | 
| 
 | 
   513     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
 | 
| 
 | 
   514         raise customErr if customErr else err
 | 
| 
 | 
   515 
 | 
| 
 | 
   516 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
 | 
| 
 | 
   517     """
 | 
| 
 | 
   518     Saves svg data opened with lxml.etree in a .svg file, created at the given path.
 | 
| 
 | 
   519 
 | 
| 
 | 
   520     Args:
 | 
| 
 | 
   521         path : the path to the .svg file.
 | 
| 
 | 
   522         data : the data to be written to the file.
 | 
| 
 | 
   523     
 | 
| 
 | 
   524     Returns:
 | 
| 
 | 
   525         None
 | 
| 
 | 
   526     """
 | 
| 
 | 
   527     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
 | 
| 
 | 
   528 
 | 
| 
 | 
   529 # UI ARGUMENTS
 | 
| 
 | 
   530 class Bool:
 | 
| 
 | 
   531     def __init__(self, argName :str) -> None:
 | 
| 
 | 
   532         self.argName = argName
 | 
| 
 | 
   533 
 | 
| 
 | 
   534     def __call__(self, s :str) -> bool: return self.check(s)
 | 
| 
 | 
   535 
 | 
| 
 | 
   536     def check(self, s :str) -> bool:
 | 
| 
 | 
   537         s = s.lower()
 | 
| 
 | 
   538         if s == "true" : return True
 | 
| 
 | 
   539         if s == "false": return False
 | 
| 
 | 
   540         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   541 
 | 
| 
 | 
   542 class Float:
 | 
| 
 | 
   543     def __init__(self, argName = "Dataset values, not an argument") -> None:
 | 
| 
 | 
   544         self.argName = argName
 | 
| 
 | 
   545     
 | 
| 
 | 
   546     def __call__(self, s :str) -> float: return self.check(s)
 | 
| 
 | 
   547 
 | 
| 
 | 
   548     def check(self, s :str) -> float:
 | 
| 
 | 
   549         try: return float(s)
 | 
| 
 | 
   550         except ValueError:
 | 
| 
 | 
   551             s = s.lower()
 | 
| 
 | 
   552             if s == "nan" or s == "none": return math.nan
 | 
| 
 | 
   553             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   554 
 | 
| 
 | 
   555 # MODELS
 | 
| 
 | 
   556 OldRule = List[Union[str, "OldRule"]]
 | 
| 
 | 
   557 class Model(Enum):
 | 
| 
 | 
   558     """
 | 
| 
 | 
   559     Represents a metabolic model, either custom or locally supported. Custom models don't point
 | 
| 
 | 
   560     to valid file paths.
 | 
| 
 | 
   561     """
 | 
| 
 | 
   562 
 | 
| 
 | 
   563     Recon   = "Recon"
 | 
| 
 | 
   564     ENGRO2  = "ENGRO2"
 | 
| 
 | 
   565     ENGRO2_no_legend = "ENGRO2_no_legend"
 | 
| 
 | 
   566     HMRcore = "HMRcore"
 | 
| 
 | 
   567     HMRcore_no_legend = "HMRcore_no_legend"
 | 
| 
 | 
   568     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
 | 
| 
 | 
   569 
 | 
| 
 | 
   570     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
 | 
| 
 | 
   571         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
 | 
| 
 | 
   572 
 | 
| 
 | 
   573     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
 | 
| 
 | 
   574         """
 | 
| 
 | 
   575         Open "rules" file for this model.
 | 
| 
 | 
   576 
 | 
| 
 | 
   577         Returns:
 | 
| 
 | 
   578             Dict[str, Dict[str, OldRule]] : the rules for this model.
 | 
| 
 | 
   579         """
 | 
| 
 | 
   580         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   581         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   582         return readPickle(path)
 | 
| 
 | 
   583     
 | 
| 
 | 
   584     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
 | 
| 
 | 
   585         """
 | 
| 
 | 
   586         Open "gene translator (old: gene_in_rule)" file for this model.
 | 
| 
 | 
   587 
 | 
| 
 | 
   588         Returns:
 | 
| 
 | 
   589             Dict[str, Dict[str, str]] : the translator dict for this model.
 | 
| 
 | 
   590         """
 | 
| 
 | 
   591         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   592         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   593         return readPickle(path)
 | 
| 
 | 
   594     
 | 
| 
 | 
   595     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
 | 
| 
 | 
   596         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
 | 
| 
 | 
   597         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   598         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
 | 
| 
 | 
   599     
 | 
| 
 | 
   600     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
 | 
| 
 | 
   601         if(self is Model.Custom):
 | 
| 
 | 
   602             return self.load_custom_model(customPath, customExtension)
 | 
| 
 | 
   603         else:
 | 
| 
 | 
   604             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
 | 
| 
 | 
   605         
 | 
| 
 | 
   606     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
 | 
| 
 | 
   607         ext = ext if ext else file_path.ext
 | 
| 
 | 
   608         try:
 | 
| 
 | 
   609             if ext in FileFormat.XML:
 | 
| 
 | 
   610                 return cobra.io.read_sbml_model(file_path.show())
 | 
| 
 | 
   611             
 | 
| 
 | 
   612             if ext in FileFormat.JSON:
 | 
| 
 | 
   613                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   614                 if(ext == "json"):
 | 
| 
 | 
   615                     return cobra.io.load_json_model(file_path.show())
 | 
| 
 | 
   616                 else: 
 | 
| 
 | 
   617                     return self.extract_json_model(file_path, ext)
 | 
| 
 | 
   618 
 | 
| 
 | 
   619         except Exception as e: raise DataErr(file_path, e.__str__())
 | 
| 
 | 
   620         raise DataErr(file_path,
 | 
| 
 | 
   621             f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")
 | 
| 
 | 
   622     
 | 
| 
 | 
   623 
 | 
| 
 | 
   624     def extract_json_model(file_path:FilePath, ext :FileFormat) -> cobra.Model:
 | 
| 
 | 
   625         """
 | 
| 
 | 
   626         Extract json COBRA model from a compressed file (zip, gz, bz2).
 | 
| 
 | 
   627         
 | 
| 
 | 
   628         Args:
 | 
| 
 | 
   629             file_path: File path of the model
 | 
| 
 | 
   630             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
 | 
| 
 | 
   631             
 | 
| 
 | 
   632         Returns:
 | 
| 
 | 
   633             cobra.Model: COBRApy model 
 | 
| 
 | 
   634             
 | 
| 
 | 
   635         Raises:
 | 
| 
 | 
   636             Exception: Extraction errors
 | 
| 
 | 
   637         """
 | 
| 
 | 
   638         ext_str = str(ext)
 | 
| 
 | 
   639 
 | 
| 
 | 
   640         try:
 | 
| 
 | 
   641             if '.zip' in ext_str:
 | 
| 
 | 
   642                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
 | 
| 
 | 
   643                     with zip_ref.open(zip_ref.namelist()[0]) as json_file:
 | 
| 
 | 
   644                         content = json_file.read().decode('utf-8')
 | 
| 
 | 
   645                         return cobra.io.load_json_model(StringIO(content))
 | 
| 
 | 
   646             elif '.gz' in ext_str:
 | 
| 
 | 
   647                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
 | 
| 
 | 
   648                     return cobra.io.load_json_model(gz_ref)
 | 
| 
 | 
   649             elif '.bz2' in ext_str:
 | 
| 
 | 
   650                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
 | 
| 
 | 
   651                     return cobra.io.load_json_model(bz2_ref)
 | 
| 
 | 
   652             else:
 | 
| 
 | 
   653                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
 | 
| 
 | 
   654             
 | 
| 
 | 
   655         except Exception as e:
 | 
| 
 | 
   656             raise Exception(f"Error during model extraction: {str(e)}")
 | 
| 
 | 
   657         
 | 
| 
 | 
   658 
 | 
| 
 | 
   659 
 | 
| 
240
 | 
   660     def __str__(self) -> str: return self.value |