| 335 | 1 import math | 
|  | 2 import re | 
|  | 3 import sys | 
|  | 4 import csv | 
|  | 5 import pickle | 
|  | 6 import lxml.etree as ET | 
|  | 7 | 
|  | 8 from enum import Enum | 
|  | 9 from itertools import count | 
|  | 10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union | 
|  | 11 | 
|  | 12 import pandas as pd | 
|  | 13 import cobra | 
|  | 14 | 
|  | 15 import zipfile | 
|  | 16 import gzip | 
|  | 17 import bz2 | 
|  | 18 from io import StringIO | 
|  | 19 | 
|  | 20 # FILES | 
|  | 21 class FileFormat(Enum): | 
|  | 22     """ | 
|  | 23     Encodes possible file extensions to conditionally save data in a different format. | 
|  | 24     """ | 
|  | 25     DAT    = ("dat",) # this is how galaxy treats all your files! | 
|  | 26     CSV    = ("csv",) # this is how most editable input data is written | 
|  | 27     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written | 
|  | 28 | 
|  | 29     SVG    = ("svg",) # this is how most metabolic maps are written | 
|  | 30     PNG    = ("png",) # this is a common output format for images (such as metabolic maps) | 
|  | 31     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications. | 
|  | 32 | 
|  | 33     XML    = ("xml","xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed | 
|  | 34     JSON   = ("json","json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed | 
|  | 35 | 
|  | 36     TXT = ("txt",) # this is how most output data is written | 
|  | 37 | 
|  | 38     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | 
|  | 39 | 
|  | 40 | 
|  | 41     @classmethod | 
|  | 42     def fromExt(cls, ext :str) -> "FileFormat": | 
|  | 43         """ | 
|  | 44         Converts a file extension string to a FileFormat instance. | 
|  | 45 | 
|  | 46         Args: | 
|  | 47             ext : The file extension as a string. | 
|  | 48 | 
|  | 49         Returns: | 
|  | 50             FileFormat: The FileFormat instance corresponding to the file extension. | 
|  | 51         """ | 
|  | 52         variantName = ext.upper() | 
|  | 53         if variantName in FileFormat.__members__: | 
|  | 54             instance = FileFormat[variantName] | 
|  | 55             instance.original_extension = ext | 
|  | 56             return instance | 
|  | 57 | 
|  | 58         variantName = variantName.lower() | 
|  | 59         for member in cls: | 
|  | 60             if variantName in member.value: | 
|  | 61                 member.original_extension = ext | 
|  | 62                 return member | 
|  | 63 | 
|  | 64         raise ValueErr("ext", "a valid FileFormat file extension", ext) | 
|  | 65 | 
|  | 66     def __str__(self) -> str: | 
|  | 67         """ | 
|  | 68         (Private) converts to str representation. Good practice for usage with argparse. | 
|  | 69 | 
|  | 70         Returns: | 
|  | 71             str : the string representation of the file extension. | 
|  | 72         """ | 
|  | 73 | 
|  | 74         if(self.values[-1] in  ["json", "xml"]): #return the original string extension for compressed files | 
|  | 75             return self.original_extension | 
|  | 76         else: | 
|  | 77             return self.value[-1] # for all other formats and pickle | 
|  | 78 | 
|  | 79 class FilePath(): | 
|  | 80     """ | 
|  | 81     Represents a file path. View this as an attempt to standardize file-related operations by expecting | 
|  | 82     values of this type in any process requesting a file path. | 
|  | 83     """ | 
|  | 84     def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None: | 
|  | 85         """ | 
|  | 86         (Private) Initializes an instance of FilePath. | 
|  | 87 | 
|  | 88         Args: | 
|  | 89             path : the end of the path, containing the file name. | 
|  | 90             ext : the file's extension. | 
|  | 91             prefix : anything before path, if the last '/' isn't there it's added by the code. | 
|  | 92 | 
|  | 93         Returns: | 
|  | 94             None : practically, a FilePath instance. | 
|  | 95         """ | 
|  | 96         self.ext      = ext | 
|  | 97         self.filePath = filePath | 
|  | 98 | 
|  | 99         if prefix and prefix[-1] != '/': prefix += '/' | 
|  | 100         self.prefix = prefix | 
|  | 101 | 
|  | 102     @classmethod | 
|  | 103     def fromStrPath(cls, path :str) -> "FilePath": | 
|  | 104         """ | 
|  | 105         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | 
|  | 106         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. | 
|  | 107         These double extensions are not supported for other file types such as .csv. | 
|  | 108 | 
|  | 109         Args: | 
|  | 110             path : the string containing the path | 
|  | 111 | 
|  | 112         Raises: | 
|  | 113             PathErr : if the provided string doesn't represent a valid path. | 
|  | 114 | 
|  | 115         Returns: | 
|  | 116             FilePath : the constructed instance. | 
|  | 117         """ | 
|  | 118         # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should* | 
|  | 119         # always be correct paths and could be used as raw strings, however most if not all functions that work with | 
|  | 120         # file paths request the FilePath objects specifically, which is a very good thing in any case other than this. | 
|  | 121         # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only | 
|  | 122         # to call show() immediately to bring back the string and open the file it points to. | 
|  | 123         # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES | 
|  | 124         # their correctness when modifying the UI and avoids the pointless back-and-forth. | 
|  | 125         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | 
|  | 126         if not result or not result["name"] or not result["ext"]: | 
|  | 127             raise PathErr(path, "cannot recognize folder structure or extension in path") | 
|  | 128 | 
|  | 129         prefix = result["prefix"] if result["prefix"] else "" | 
|  | 130         name, ext = result["name"], result["ext"] | 
|  | 131 | 
|  | 132         # Split path into parts | 
|  | 133         parts = path.split(".") | 
|  | 134         if len(parts) >= 3: | 
|  | 135             penultimate = parts[-2] | 
|  | 136             last = parts[-1] | 
|  | 137             if penultimate in {"json", "xml"}: | 
|  | 138                 name = ".".join(parts[:-2]) | 
|  | 139                 ext = f"{penultimate}.{last}" | 
|  | 140 | 
|  | 141         return cls(name, FileFormat.fromExt(ext), prefix=prefix) | 
|  | 142 | 
|  | 143     def show(self) -> str: | 
|  | 144         """ | 
|  | 145         Shows the path as a string. | 
|  | 146 | 
|  | 147         Returns: | 
|  | 148             str : the path shown as a string. | 
|  | 149         """ | 
|  | 150         return f"{self.prefix}{self.filePath}.{self.ext}" | 
|  | 151 | 
|  | 152     def __str__(self) -> str: return self.show() | 
|  | 153 | 
|  | 154 # ERRORS | 
|  | 155 def terminate(msg :str) -> None: | 
|  | 156     """ | 
|  | 157     Terminate the execution of the script with an error message. | 
|  | 158 | 
|  | 159     Args: | 
|  | 160         msg (str): The error message to be displayed. | 
|  | 161 | 
|  | 162     Returns: | 
|  | 163         None | 
|  | 164     """ | 
|  | 165     sys.exit(f"Execution aborted: {msg}\n") | 
|  | 166 | 
|  | 167 def logWarning(msg :str, loggerPath :str) -> None: | 
|  | 168     """ | 
|  | 169     Log a warning message to an output log file and print it to the console. The final period and a | 
|  | 170     newline is added by the function. | 
|  | 171 | 
|  | 172     Args: | 
|  | 173         s (str): The warning message to be logged and printed. | 
|  | 174         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | 
|  | 175         immediately read back (beware relative expensive operation, log with caution). | 
|  | 176 | 
|  | 177     Returns: | 
|  | 178         None | 
|  | 179     """ | 
|  | 180     # building the path and then reading it immediately seems useless, but it's actually a way of | 
|  | 181     # validating that reduces repetition on the caller's side. Besides, logging a message by writing | 
|  | 182     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | 
|  | 183     # mindlessly logging whenever something comes up, log at the very end and tell the user everything | 
|  | 184     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | 
|  | 185     # the file only at the end of the program's execution. | 
|  | 186     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | 
|  | 187 | 
|  | 188 class CustomErr(Exception): | 
|  | 189     """ | 
|  | 190     Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | 
|  | 191     """ | 
|  | 192     __idGenerator = count() | 
|  | 193     errName = "Custom Error" | 
|  | 194     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | 
|  | 195         """ | 
|  | 196         (Private) Initializes an instance of CustomErr. | 
|  | 197 | 
|  | 198         Args: | 
|  | 199             msg (str): Error message to be displayed. | 
|  | 200             details (str): Informs the user more about the error encountered. Defaults to "". | 
|  | 201             explicitErrCode (int): Explicit error code to be used. Defaults to -1. | 
|  | 202 | 
|  | 203         Returns: | 
|  | 204             None : practically, a CustomErr instance. | 
|  | 205         """ | 
|  | 206         self.msg     = msg | 
|  | 207         self.details = details | 
|  | 208 | 
|  | 209         self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | 
|  | 210 | 
|  | 211     def throw(self, loggerPath = "") -> None: | 
|  | 212         """ | 
|  | 213         Raises the current CustomErr instance, logging a warning message before doing so. | 
|  | 214 | 
|  | 215         Raises: | 
|  | 216             self: The current CustomErr instance. | 
|  | 217 | 
|  | 218         Returns: | 
|  | 219             None | 
|  | 220         """ | 
|  | 221         if loggerPath: logWarning(str(self), loggerPath) | 
|  | 222         raise self | 
|  | 223 | 
|  | 224     def abort(self) -> None: | 
|  | 225         """ | 
|  | 226         Aborts the execution of the script. | 
|  | 227 | 
|  | 228         Returns: | 
|  | 229             None | 
|  | 230         """ | 
|  | 231         terminate(str(self)) | 
|  | 232 | 
|  | 233     def __str__(self) -> str: | 
|  | 234         """ | 
|  | 235         (Private) Returns a string representing the current CustomErr instance. | 
|  | 236 | 
|  | 237         Returns: | 
|  | 238             str: A string representing the current CustomErr instance. | 
|  | 239         """ | 
|  | 240         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | 
|  | 241 | 
|  | 242 class ArgsErr(CustomErr): | 
|  | 243     """ | 
|  | 244     CustomErr subclass for UI arguments errors. | 
|  | 245     """ | 
|  | 246     errName = "Args Error" | 
|  | 247     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 248         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | 
|  | 249 | 
|  | 250 class DataErr(CustomErr): | 
|  | 251     """ | 
|  | 252     CustomErr subclass for data formatting errors. | 
|  | 253     """ | 
|  | 254     errName = "Data Format Error" | 
|  | 255     def __init__(self, fileName :str, msg = "no further details provided") -> None: | 
|  | 256         super().__init__(f"file \"{fileName}\" contains malformed data", msg) | 
|  | 257 | 
|  | 258 class PathErr(CustomErr): | 
|  | 259     """ | 
|  | 260     CustomErr subclass for filepath formatting errors. | 
|  | 261     """ | 
|  | 262     errName = "Path Error" | 
|  | 263     def __init__(self, path :FilePath, msg = "no further details provided") -> None: | 
|  | 264         super().__init__(f"path \"{path}\" is invalid", msg) | 
|  | 265 | 
|  | 266 class ValueErr(CustomErr): | 
|  | 267     """ | 
|  | 268     CustomErr subclass for any value error. | 
|  | 269     """ | 
|  | 270     errName = "Value Error" | 
|  | 271     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | 
|  | 272         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | 
|  | 273 | 
|  | 274 # RESULT | 
|  | 275 T = TypeVar('T') | 
|  | 276 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | 
|  | 277 class Result(Generic[T, E]): | 
|  | 278     class ResultErr(CustomErr): | 
|  | 279         """ | 
|  | 280         CustomErr subclass for all Result errors. | 
|  | 281         """ | 
|  | 282         errName = "Result Error" | 
|  | 283         def __init__(self, msg = "no further details provided") -> None: | 
|  | 284             super().__init__(msg) | 
|  | 285     """ | 
|  | 286     Class to handle the result of an operation, with a value and a boolean flag to indicate | 
|  | 287     whether the operation was successful or not. | 
|  | 288     """ | 
|  | 289     def __init__(self, value :Union[T, E], isOk :bool) -> None: | 
|  | 290         """ | 
|  | 291         (Private) Initializes an instance of Result. | 
|  | 292 | 
|  | 293         Args: | 
|  | 294             value (Union[T, E]): The value to be stored in the Result instance. | 
|  | 295             isOk (bool): A boolean flag to indicate whether the operation was successful or not. | 
|  | 296 | 
|  | 297             Returns: | 
|  | 298                 None : practically, a Result instance. | 
|  | 299         """ | 
|  | 300         self.isOk  = isOk | 
|  | 301         self.isErr = not isOk | 
|  | 302         self.value = value | 
|  | 303 | 
|  | 304     @classmethod | 
|  | 305     def Ok(cls,  value :T) -> "Result": | 
|  | 306         """ | 
|  | 307         Constructs a new Result instance with a successful operation. | 
|  | 308 | 
|  | 309         Args: | 
|  | 310             value (T): The value to be stored in the Result instance, set as successful. | 
|  | 311 | 
|  | 312         Returns: | 
|  | 313             Result: A new Result instance with a successful operation. | 
|  | 314         """ | 
|  | 315         return Result(value, isOk = True) | 
|  | 316 | 
|  | 317     @classmethod | 
|  | 318     def Err(cls, value :E) -> "Result": | 
|  | 319         """ | 
|  | 320         Constructs a new Result instance with a failed operation. | 
|  | 321 | 
|  | 322         Args: | 
|  | 323             value (E): The value to be stored in the Result instance, set as failed. | 
|  | 324 | 
|  | 325         Returns: | 
|  | 326             Result: A new Result instance with a failed operation. | 
|  | 327         """ | 
|  | 328         return Result(value, isOk = False) | 
|  | 329 | 
|  | 330     def unwrap(self) -> T: | 
|  | 331         """ | 
|  | 332         Unwraps the value of the Result instance, if the operation was successful. | 
|  | 333 | 
|  | 334         Raises: | 
|  | 335             ResultErr: If the operation was not successful. | 
|  | 336 | 
|  | 337         Returns: | 
|  | 338             T: The value of the Result instance, if the operation was successful. | 
|  | 339         """ | 
|  | 340         if self.isOk: return self.value | 
|  | 341         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | 
|  | 342 | 
|  | 343     def unwrapOr(self, default :T) -> T: | 
|  | 344         """ | 
|  | 345         Unwraps the value of the Result instance, if the operation was successful, otherwise | 
|  | 346         it returns a default value. | 
|  | 347 | 
|  | 348         Args: | 
|  | 349             default (T): The default value to be returned if the operation was not successful. | 
|  | 350 | 
|  | 351         Returns: | 
|  | 352             T: The value of the Result instance, if the operation was successful, | 
|  | 353             otherwise the default value. | 
|  | 354         """ | 
|  | 355         return self.value if self.isOk else default | 
|  | 356 | 
|  | 357     def expect(self, err :"Result.ResultErr") -> T: | 
|  | 358         """ | 
|  | 359         Expects that the value of the Result instance is successful, otherwise it raises an error. | 
|  | 360 | 
|  | 361         Args: | 
|  | 362             err (Exception): The error to be raised if the operation was not successful. | 
|  | 363 | 
|  | 364         Raises: | 
|  | 365             err: The error raised if the operation was not successful. | 
|  | 366 | 
|  | 367         Returns: | 
|  | 368             T: The value of the Result instance, if the operation was successful. | 
|  | 369         """ | 
|  | 370         if self.isOk: return self.value | 
|  | 371         raise err | 
|  | 372 | 
|  | 373     U = TypeVar("U") | 
|  | 374     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | 
|  | 375         """ | 
|  | 376         Maps the value of the current Result to whatever is returned by the mapper function. | 
|  | 377         If the Result contained an unsuccessful operation to begin with it remains unchanged | 
|  | 378         (a reference to the current instance is returned). | 
|  | 379         If the mapper function panics the returned result instance will be of the error kind. | 
|  | 380 | 
|  | 381         Args: | 
|  | 382             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | 
|  | 383 | 
|  | 384         Returns: | 
|  | 385             Result[U, E]: The result of the mapper operation applied to the Result value. | 
|  | 386         """ | 
|  | 387         if self.isErr: return self | 
|  | 388         try: return Result.Ok(mapper(self.value)) | 
|  | 389         except Exception as e: return Result.Err(e) | 
|  | 390 | 
|  | 391     D = TypeVar("D", bound = "Result.ResultErr") | 
|  | 392     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | 
|  | 393         """ | 
|  | 394         Maps the error of the current Result to whatever is returned by the mapper function. | 
|  | 395         If the Result contained a successful operation it remains unchanged | 
|  | 396         (a reference to the current instance is returned). | 
|  | 397         If the mapper function panics this method does as well. | 
|  | 398 | 
|  | 399         Args: | 
|  | 400             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | 
|  | 401 | 
|  | 402         Returns: | 
|  | 403             Result[U, E]: The result of the mapper operation applied to the Result error. | 
|  | 404         """ | 
|  | 405         if self.isOk: return self | 
|  | 406         return Result.Err(mapper(self.value)) | 
|  | 407 | 
|  | 408     def __str__(self): | 
|  | 409         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | 
|  | 410 | 
|  | 411 # FILES | 
|  | 412 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | 
|  | 413     """ | 
|  | 414     Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | 
|  | 415 | 
|  | 416     Args: | 
|  | 417         path : the path to the dataset file. | 
|  | 418         datasetName : the name of the dataset. | 
|  | 419 | 
|  | 420     Raises: | 
|  | 421         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | 
|  | 422         it has less than 2 columns. | 
|  | 423 | 
|  | 424     Returns: | 
|  | 425         pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | 
|  | 426     """ | 
|  | 427     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | 
|  | 428     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | 
|  | 429     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | 
|  | 430     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | 
|  | 431     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | 
|  | 432     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | 
|  | 433     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | 
|  | 434     except: | 
|  | 435         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | 
|  | 436         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | 
|  | 437 | 
|  | 438     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | 
|  | 439     return dataset | 
|  | 440 | 
|  | 441 def readPickle(path :FilePath) -> Any: | 
|  | 442     """ | 
|  | 443     Reads the contents of a .pickle file, which needs to exist at the given path. | 
|  | 444 | 
|  | 445     Args: | 
|  | 446         path : the path to the .pickle file. | 
|  | 447 | 
|  | 448     Returns: | 
|  | 449         Any : the data inside a pickle file, could be anything. | 
|  | 450     """ | 
|  | 451     with open(path.show(), "rb") as fd: return pickle.load(fd) | 
|  | 452 | 
|  | 453 def writePickle(path :FilePath, data :Any) -> None: | 
|  | 454     """ | 
|  | 455     Saves any data in a .pickle file, created at the given path. | 
|  | 456 | 
|  | 457     Args: | 
|  | 458         path : the path to the .pickle file. | 
|  | 459         data : the data to be written to the file. | 
|  | 460 | 
|  | 461     Returns: | 
|  | 462         None | 
|  | 463     """ | 
|  | 464     with open(path.show(), "wb") as fd: pickle.dump(data, fd) | 
|  | 465 | 
|  | 466 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | 
|  | 467     """ | 
|  | 468     Reads the contents of a .csv file, which needs to exist at the given path. | 
|  | 469 | 
|  | 470     Args: | 
|  | 471         path : the path to the .csv file. | 
|  | 472         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | 
|  | 473         skipHeader : whether the first row of the file is a header and should be skipped. | 
|  | 474 | 
|  | 475     Returns: | 
|  | 476         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | 
|  | 477     """ | 
|  | 478     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | 
|  | 479 | 
|  | 480 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | 
|  | 481     """ | 
|  | 482     Reads the contents of a .svg file, which needs to exist at the given path. | 
|  | 483 | 
|  | 484     Args: | 
|  | 485         path : the path to the .svg file. | 
|  | 486 | 
|  | 487     Raises: | 
|  | 488         DataErr : if the map is malformed. | 
|  | 489 | 
|  | 490     Returns: | 
|  | 491         Any : the data inside a svg file, could be anything. | 
|  | 492     """ | 
|  | 493     try: return ET.parse(path.show()) | 
|  | 494     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | 
|  | 495         raise customErr if customErr else err | 
|  | 496 | 
|  | 497 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | 
|  | 498     """ | 
|  | 499     Saves svg data opened with lxml.etree in a .svg file, created at the given path. | 
|  | 500 | 
|  | 501     Args: | 
|  | 502         path : the path to the .svg file. | 
|  | 503         data : the data to be written to the file. | 
|  | 504 | 
|  | 505     Returns: | 
|  | 506         None | 
|  | 507     """ | 
|  | 508     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | 
|  | 509 | 
|  | 510 # UI ARGUMENTS | 
|  | 511 class Bool: | 
|  | 512     def __init__(self, argName :str) -> None: | 
|  | 513         self.argName = argName | 
|  | 514 | 
|  | 515     def __call__(self, s :str) -> bool: return self.check(s) | 
|  | 516 | 
|  | 517     def check(self, s :str) -> bool: | 
|  | 518         s = s.lower() | 
|  | 519         if s == "true" : return True | 
|  | 520         if s == "false": return False | 
|  | 521         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | 
|  | 522 | 
|  | 523 class Float: | 
|  | 524     def __init__(self, argName = "Dataset values, not an argument") -> None: | 
|  | 525         self.argName = argName | 
|  | 526 | 
|  | 527     def __call__(self, s :str) -> float: return self.check(s) | 
|  | 528 | 
|  | 529     def check(self, s :str) -> float: | 
|  | 530         try: return float(s) | 
|  | 531         except ValueError: | 
|  | 532             s = s.lower() | 
|  | 533             if s == "nan" or s == "none": return math.nan | 
|  | 534             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | 
|  | 535 | 
|  | 536 # MODELS | 
|  | 537 OldRule = List[Union[str, "OldRule"]] | 
|  | 538 class Model(Enum): | 
|  | 539     """ | 
|  | 540     Represents a metabolic model, either custom or locally supported. Custom models don't point | 
|  | 541     to valid file paths. | 
|  | 542     """ | 
|  | 543 | 
|  | 544     Recon   = "Recon" | 
|  | 545     ENGRO2  = "ENGRO2" | 
|  | 546     ENGRO2_no_legend = "ENGRO2_no_legend" | 
|  | 547     HMRcore = "HMRcore" | 
|  | 548     HMRcore_no_legend = "HMRcore_no_legend" | 
|  | 549     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | 
|  | 550 | 
|  | 551     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | 
|  | 552         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | 
|  | 553 | 
|  | 554     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | 
|  | 555         """ | 
|  | 556         Open "rules" file for this model. | 
|  | 557 | 
|  | 558         Returns: | 
|  | 559             Dict[str, Dict[str, OldRule]] : the rules for this model. | 
|  | 560         """ | 
|  | 561         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 562         self.__raiseMissingPathErr(path) | 
|  | 563         return readPickle(path) | 
|  | 564 | 
|  | 565     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | 
|  | 566         """ | 
|  | 567         Open "gene translator (old: gene_in_rule)" file for this model. | 
|  | 568 | 
|  | 569         Returns: | 
|  | 570             Dict[str, Dict[str, str]] : the translator dict for this model. | 
|  | 571         """ | 
|  | 572         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 
|  | 573         self.__raiseMissingPathErr(path) | 
|  | 574         return readPickle(path) | 
|  | 575 | 
|  | 576     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | 
|  | 577         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | 
|  | 578         self.__raiseMissingPathErr(path) | 
|  | 579         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | 
|  | 580 | 
|  | 581     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | 
|  | 582         if(self is Model.Custom): | 
|  | 583             return self.load_custom_model(customPath, customExtension) | 
|  | 584         else: | 
|  | 585             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | 
|  | 586 | 
|  | 587     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | 
|  | 588         ext = ext if ext else file_path.ext | 
|  | 589         try: | 
|  | 590             if ext in FileFormat.XML: | 
|  | 591                 return cobra.io.read_sbml_model(file_path.show()) | 
|  | 592 | 
|  | 593             if ext in FileFormat.JSON: | 
|  | 594                 # Compressed files are not automatically handled by cobra | 
|  | 595                 if(ext == "json"): | 
|  | 596                     return cobra.io.load_json_model(file_path.show()) | 
|  | 597                 else: | 
|  | 598                     return self.extract_json_model(file_path, ext) | 
|  | 599 | 
|  | 600         except Exception as e: raise DataErr(file_path, e.__str__()) | 
|  | 601         raise DataErr(file_path, | 
|  | 602             f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") | 
|  | 603 | 
|  | 604 | 
|  | 605     def extract_json_model(file_path:FilePath, ext :FileFormat) -> cobra.Model: | 
|  | 606         """ | 
|  | 607         Extract json COBRA model from a compressed file (zip, gz, bz2). | 
|  | 608 | 
|  | 609         Args: | 
|  | 610             file_path: File path of the model | 
|  | 611             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) | 
|  | 612 | 
|  | 613         Returns: | 
|  | 614             cobra.Model: COBRApy model | 
|  | 615 | 
|  | 616         Raises: | 
|  | 617             Exception: Extraction errors | 
|  | 618         """ | 
|  | 619         ext_str = str(ext) | 
|  | 620 | 
|  | 621         try: | 
|  | 622             if '.zip' in ext_str: | 
|  | 623                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: | 
|  | 624                     with zip_ref.open(zip_ref.namelist()[0]) as json_file: | 
|  | 625                         content = json_file.read().decode('utf-8') | 
|  | 626                         return cobra.io.load_json_model(StringIO(content)) | 
|  | 627             elif '.gz' in ext_str: | 
|  | 628                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: | 
|  | 629                     return cobra.io.load_json_model(gz_ref) | 
|  | 630             elif '.bz2' in ext_str: | 
|  | 631                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: | 
|  | 632                     return cobra.io.load_json_model(bz2_ref) | 
|  | 633             else: | 
|  | 634                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") | 
|  | 635 | 
|  | 636         except Exception as e: | 
|  | 637             raise Exception(f"Error during model extraction: {str(e)}") | 
|  | 638 | 
|  | 639 | 
|  | 640 | 
| 240 | 641     def __str__(self) -> str: return self.value |