comparison COBRAxy/src/utils/general_utils.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 """
2 General utilities for COBRAxy.
3
4 This module provides:
5 - File and path helpers (FileFormat, FilePath)
6 - Error and result handling utilities (CustomErr, Result)
7 - Basic I/O helpers (CSV/TSV, pickle, SVG)
8 - Lightweight CLI argument parsers (Bool, Float)
9 - Model loader utilities for COBRA models, including compressed formats
10 """
11 import math
12 import re
13 import sys
14 import csv
15 import pickle
16 import lxml.etree as ET
17
18 from enum import Enum
19 from itertools import count
20 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
21
22 import pandas as pd
23 import cobra
24
25 import zipfile
26 import gzip
27 import bz2
28 from io import StringIO
29
30
31 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
32 class ValueErr(Exception):
33 def __init__(self, param_name, expected, actual):
34 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
35
36 class PathErr(Exception):
37 def __init__(self, path, message):
38 super().__init__(f"Path error for '{path}': {message}")
39
40 class FileFormat(Enum):
41 """
42 Encodes possible file extensions to conditionally save data in a different format.
43 """
44 DAT = ("dat",)
45 CSV = ("csv",)
46 TSV = ("tsv",)
47 SVG = ("svg",)
48 PNG = ("png",)
49 PDF = ("pdf",)
50
51 # Compressed variants for common model formats
52 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2")
53 JSON = ("json", "json.gz", "json.zip", "json.bz2")
54 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2")
55 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2")
56
57 TXT = ("txt",)
58 PICKLE = ("pickle", "pk", "p")
59
60 def __init__(self, *extensions):
61 self.extensions = extensions
62 # Store original extension when set via fromExt
63 self._original_extension = None
64
65 @classmethod
66 def fromExt(cls, ext: str) -> "FileFormat":
67 """
68 Converts a file extension string to a FileFormat instance.
69 Args:
70 ext : The file extension as a string.
71 Returns:
72 FileFormat: The FileFormat instance corresponding to the file extension.
73 """
74 variantName = ext.upper()
75 if variantName in FileFormat.__members__:
76 instance = FileFormat[variantName]
77 instance._original_extension = ext
78 return instance
79
80 variantName = ext.lower()
81 for member in cls:
82 if variantName in member.value:
83 # Create a copy-like behavior by storing the original extension
84 member._original_extension = ext
85 return member
86
87 raise ValueErr("ext", "a valid FileFormat file extension", ext)
88
89 def __str__(self) -> str:
90 """
91 (Private) converts to str representation. Good practice for usage with argparse.
92 Returns:
93 str : the string representation of the file extension.
94 """
95 if hasattr(self, '_original_extension') and self._original_extension:
96 return self._original_extension
97
98 if self == FileFormat.XML:
99 return "xml"
100 elif self == FileFormat.JSON:
101 return "json"
102 elif self == FileFormat.MAT:
103 return "mat"
104 elif self == FileFormat.YML:
105 return "yml"
106
107 return self.value[-1]
108
109 class FilePath():
110 """
111 Represents a file path with format-aware helpers.
112 """
113 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
114 """
115 Initialize FilePath.
116 Args:
117 path: File name stem.
118 ext: File extension (FileFormat).
119 prefix: Optional directory path (trailing '/' auto-added).
120 """
121 self.ext = ext
122 self.filePath = filePath
123
124 if prefix and prefix[-1] != '/':
125 prefix += '/'
126 self.prefix = prefix
127
128 @classmethod
129 def fromStrPath(cls, path: str) -> "FilePath":
130 """
131 Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz).
132 Args:
133 path : the string containing the path
134 Raises:
135 PathErr : if the provided string doesn't represent a valid path.
136 Returns:
137 FilePath : the constructed instance.
138 """
139 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
140 if not result or not result["name"] or not result["ext"]:
141 raise PathErr(path, "cannot recognize folder structure or extension in path")
142
143 prefix = result["prefix"] if result["prefix"] else ""
144 name, ext = result["name"], result["ext"]
145
146 parts = path.split(".")
147 if len(parts) >= 3:
148 penultimate = parts[-2]
149 last = parts[-1]
150 double_ext = f"{penultimate}.{last}"
151
152 try:
153 ext_format = FileFormat.fromExt(double_ext)
154 name = ".".join(parts[:-2])
155 if '/' in name:
156 prefix = name[:name.rfind('/') + 1]
157 name = name[name.rfind('/') + 1:]
158 return cls(name, ext_format, prefix=prefix)
159 except ValueErr:
160 pass
161
162 try:
163 ext_format = FileFormat.fromExt(ext)
164 return cls(name, ext_format, prefix=prefix)
165 except ValueErr:
166 raise PathErr(path, f"unsupported file extension: {ext}")
167
168 def show(self) -> str:
169 """
170 Shows the path as a string.
171 Returns:
172 str : the path shown as a string.
173 """
174 return f"{self.prefix}{self.filePath}.{self.ext}"
175
176 def __str__(self) -> str:
177 return self.show()
178
179 # ERRORS
180 def terminate(msg :str) -> None:
181 """
182 Terminate the execution of the script with an error message.
183
184 Args:
185 msg (str): The error message to be displayed.
186
187 Returns:
188 None
189 """
190 sys.exit(f"Execution aborted: {msg}\n")
191
192 def logWarning(msg :str, loggerPath :str) -> None:
193 """
194 Log a warning message to an output log file and print it to the console. The final period and a
195 newline is added by the function.
196
197 Args:
198 msg (str): The warning message to be logged and printed.
199 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
200 immediately read back (beware relative expensive operation, log with caution).
201
202 Returns:
203 None
204 """
205 # Note: validates path via FilePath; keep logging minimal to avoid overhead.
206 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
207
208 class CustomErr(Exception):
209 """
210 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
211 """
212 __idGenerator = count()
213 errName = "Custom Error"
214 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
215 """
216 (Private) Initializes an instance of CustomErr.
217
218 Args:
219 msg (str): Error message to be displayed.
220 details (str): Informs the user more about the error encountered. Defaults to "".
221 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
222
223 Returns:
224 None : practically, a CustomErr instance.
225 """
226 self.msg = msg
227 self.details = details
228
229 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
230
231 def throw(self, loggerPath = "") -> None:
232 """
233 Raises the current CustomErr instance, optionally logging it first.
234
235 Args:
236 loggerPath (str): Optional path to a log file to append this error before raising.
237
238 Raises:
239 self: The current CustomErr instance.
240
241 Returns:
242 None
243 """
244 if loggerPath:
245 logWarning(str(self), loggerPath)
246 raise self
247
248 def abort(self) -> None:
249 """
250 Aborts the execution of the script.
251
252 Returns:
253 None
254 """
255 terminate(str(self))
256
257 def __str__(self) -> str:
258 """
259 (Private) Returns a string representing the current CustomErr instance.
260
261 Returns:
262 str: A string representing the current CustomErr instance.
263 """
264 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
265
266 class ArgsErr(CustomErr):
267 """
268 CustomErr subclass for UI arguments errors.
269 """
270 errName = "Args Error"
271 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
272 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
273
274 class DataErr(CustomErr):
275 """
276 CustomErr subclass for data formatting errors.
277 """
278 errName = "Data Format Error"
279 def __init__(self, fileName :str, msg = "no further details provided") -> None:
280 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
281
282 class PathErr(CustomErr):
283 """
284 CustomErr subclass for filepath formatting errors.
285 """
286 errName = "Path Error"
287 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
288 super().__init__(f"path \"{path}\" is invalid", msg)
289
290 class ValueErr(CustomErr):
291 """
292 CustomErr subclass for any value error.
293 """
294 errName = "Value Error"
295 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
296 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
297
298 # RESULT
299 T = TypeVar('T')
300 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
301 class Result(Generic[T, E]):
302 class ResultErr(CustomErr):
303 """
304 CustomErr subclass for all Result errors.
305 """
306 errName = "Result Error"
307 def __init__(self, msg = "no further details provided") -> None:
308 super().__init__(msg)
309 """
310 Class to handle the result of an operation, with a value and a boolean flag to indicate
311 whether the operation was successful or not.
312 """
313 def __init__(self, value :Union[T, E], isOk :bool) -> None:
314 """
315 Initialize an instance of Result.
316
317 Args:
318 value (Union[T, E]): The value to be stored in the Result instance.
319 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
320
321 Returns:
322 None : practically, a Result instance.
323 """
324 self.isOk = isOk
325 self.isErr = not isOk
326 self.value = value
327
328 @classmethod
329 def Ok(cls, value :T) -> "Result":
330 """
331 Construct a successful Result.
332
333 Args:
334 value (T): The value to be stored in the Result instance, set as successful.
335
336 Returns:
337 Result: A new Result instance with a successful operation.
338 """
339 return Result(value, isOk = True)
340
341 @classmethod
342 def Err(cls, value :E) -> "Result":
343 """
344 Construct a failed Result.
345
346 Args:
347 value (E): The value to be stored in the Result instance, set as failed.
348
349 Returns:
350 Result: A new Result instance with a failed operation.
351 """
352 return Result(value, isOk = False)
353
354 def unwrap(self) -> T:
355 """
356 Unwraps the value of the Result instance, if the operation was successful.
357
358 Raises:
359 ResultErr: If the operation was not successful.
360
361 Returns:
362 T: The value of the Result instance, if the operation was successful.
363 """
364 if self.isOk: return self.value
365 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
366
367 def unwrapOr(self, default :T) -> T:
368 """
369 Unwraps the value of the Result instance, if the operation was successful, otherwise
370 it returns a default value.
371
372 Args:
373 default (T): The default value to be returned if the operation was not successful.
374
375 Returns:
376 T: The value of the Result instance, if the operation was successful,
377 otherwise the default value.
378 """
379 return self.value if self.isOk else default
380
381 def expect(self, err :"Result.ResultErr") -> T:
382 """
383 Expects that the value of the Result instance is successful, otherwise it raises an error.
384
385 Args:
386 err (Exception): The error to be raised if the operation was not successful.
387
388 Raises:
389 err: The error raised if the operation was not successful.
390
391 Returns:
392 T: The value of the Result instance, if the operation was successful.
393 """
394 if self.isOk: return self.value
395 raise err
396
397 U = TypeVar("U")
398 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
399 """
400 Maps the value of the current Result to whatever is returned by the mapper function.
401 If the Result contained an unsuccessful operation to begin with it remains unchanged
402 (a reference to the current instance is returned).
403 If the mapper function panics the returned result instance will be of the error kind.
404
405 Args:
406 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
407
408 Returns:
409 Result[U, E]: The result of the mapper operation applied to the Result value.
410 """
411 if self.isErr: return self
412 try: return Result.Ok(mapper(self.value))
413 except Exception as e: return Result.Err(e)
414
415 D = TypeVar("D", bound = "Result.ResultErr")
416 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
417 """
418 Maps the error of the current Result to whatever is returned by the mapper function.
419 If the Result contained a successful operation it remains unchanged
420 (a reference to the current instance is returned).
421 If the mapper function panics this method does as well.
422
423 Args:
424 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
425
426 Returns:
427 Result[U, E]: The result of the mapper operation applied to the Result error.
428 """
429 if self.isOk: return self
430 return Result.Err(mapper(self.value))
431
432 def __str__(self):
433 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
434
435 # FILES
436 def readPickle(path :FilePath) -> Any:
437 """
438 Reads the contents of a .pickle file, which needs to exist at the given path.
439
440 Args:
441 path : the path to the .pickle file.
442
443 Returns:
444 Any : the data inside a pickle file, could be anything.
445 """
446 with open(path.show(), "rb") as fd: return pickle.load(fd)
447
448 def writePickle(path :FilePath, data :Any) -> None:
449 """
450 Saves any data in a .pickle file, created at the given path.
451
452 Args:
453 path : the path to the .pickle file.
454 data : the data to be written to the file.
455
456 Returns:
457 None
458 """
459 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
460
461 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
462 """
463 Reads the contents of a .csv file, which needs to exist at the given path.
464
465 Args:
466 path : the path to the .csv file.
467 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
468 skipHeader : whether the first row of the file is a header and should be skipped.
469
470 Returns:
471 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
472 """
473 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
474
475 def findIdxByName(header: List[str], name: str, colName="name") -> Optional[int]:
476 """
477 Find the indices of the 'ReactionID' column and a user-specified column name
478 within the header row of a tabular file.
479
480 Args:
481 header (List[str]): The header row, as a list of column names.
482 name (str): The name of the column to look for (e.g. 'GPR').
483 colName (str, optional): Label used in error messages for clarity. Defaults to "name".
484
485 Returns:
486 Tuple[int, int]: A tuple containing:
487 - The index of the 'ReactionID' column.
488 - The index of the requested column `name`.
489
490 Raises:
491 ValueError: If 'ReactionID' or the requested column `name` is not found in the header.
492
493 Notes:
494 Both 'ReactionID' and the requested column are mandatory for downstream processing.
495 """
496
497 col_index = {col_name: idx for idx, col_name in enumerate(header)}
498
499 if name not in col_index or "ReactionID" not in col_index:
500 raise ValueError(f"Tabular file must contain 'ReactionID' and {name} columns.")
501
502 id_idx = col_index["ReactionID"]
503 idx_gpr = col_index[name]
504
505 return id_idx, idx_gpr
506
507
508 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
509 """
510 Reads the contents of a .svg file, which needs to exist at the given path.
511
512 Args:
513 path : the path to the .svg file.
514
515 Raises:
516 DataErr : if the map is malformed.
517
518 Returns:
519 Any : the data inside a svg file, could be anything.
520 """
521 try: return ET.parse(path.show())
522 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
523 raise customErr if customErr else err
524
525 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
526 """
527 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
528
529 Args:
530 path : the path to the .svg file.
531 data : the data to be written to the file.
532
533 Returns:
534 None
535 """
536 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
537
538 # UI ARGUMENTS
539 class Bool:
540 """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive)."""
541 def __init__(self, argName :str) -> None:
542 self.argName = argName
543
544 def __call__(self, s :str) -> bool: return self.check(s)
545
546 def check(self, s :str) -> bool:
547 s = s.lower()
548 if s == "true" : return True
549 if s == "false": return False
550 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
551
552 class Float:
553 """Float CLI argument parser supporting NaN and None keywords (case-insensitive)."""
554 def __init__(self, argName = "Dataset values, not an argument") -> None:
555 self.argName = argName
556
557 def __call__(self, s :str) -> float: return self.check(s)
558
559 def check(self, s :str) -> float:
560 try: return float(s)
561 except ValueError:
562 s = s.lower()
563 if s == "nan" or s == "none": return math.nan
564 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
565
566 # MODELS
567 OldRule = List[Union[str, "OldRule"]]
568 class Model(Enum):
569 """
570 Represents a metabolic model, either custom or locally supported. Custom models don't point
571 to valid file paths.
572 """
573
574 Recon = "Recon"
575 ENGRO2 = "ENGRO2"
576 ENGRO2_no_legend = "ENGRO2_no_legend"
577 HMRcore = "HMRcore"
578 HMRcore_no_legend = "HMRcore_no_legend"
579 Custom = "Custom"
580
581 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
582 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
583
584 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
585 """
586 Open "rules" file for this model.
587
588 Returns:
589 Dict[str, Dict[str, OldRule]] : the rules for this model.
590 """
591 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
592 self.__raiseMissingPathErr(path)
593 return readPickle(path)
594
595 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
596 """
597 Open "gene translator (old: gene_in_rule)" file for this model.
598
599 Returns:
600 Dict[str, Dict[str, str]] : the translator dict for this model.
601 """
602 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
603 self.__raiseMissingPathErr(path)
604 return readPickle(path)
605
606 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
607 """Open the SVG metabolic map for this model."""
608 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
609 self.__raiseMissingPathErr(path)
610 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
611
612 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
613 """Load the COBRA model for this enum variant (supports Custom with explicit path/extension)."""
614 if(self is Model.Custom):
615 return self.load_custom_model(customPath, customExtension)
616 else:
617 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
618
619 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
620 """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not)."""
621 ext = ext if ext else file_path.ext
622 try:
623 if str(ext) in FileFormat.XML.value:
624 return cobra.io.read_sbml_model(file_path.show())
625
626 if str(ext) in FileFormat.JSON.value:
627 # Compressed files are not automatically handled by cobra
628 if(ext == "json"):
629 return cobra.io.load_json_model(file_path.show())
630 else:
631 return self.extract_model(file_path, ext, "json")
632
633 if str(ext) in FileFormat.MAT.value:
634 # Compressed files are not automatically handled by cobra
635 if(ext == "mat"):
636 return cobra.io.load_matlab_model(file_path.show())
637 else:
638 return self.extract_model(file_path, ext, "mat")
639
640 if str(ext) in FileFormat.YML.value:
641 # Compressed files are not automatically handled by cobra
642 if(ext == "yml"):
643 return cobra.io.load_yaml_model(file_path.show())
644 else:
645 return self.extract_model(file_path, ext, "yml")
646
647 except Exception as e: raise DataErr(file_path, e.__str__())
648 raise DataErr(file_path,
649 f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
650
651
652 def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
653 """
654 Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
655
656 Args:
657 file_path: File path of the model
658 ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
659
660 Returns:
661 cobra.Model: COBRApy model
662
663 Raises:
664 Exception: Extraction errors
665 """
666 ext_str = str(ext)
667
668 try:
669 if '.zip' in ext_str:
670 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
671 with zip_ref.open(zip_ref.namelist()[0]) as json_file:
672 content = json_file.read().decode('utf-8')
673 if model_encoding == "json":
674 return cobra.io.load_json_model(StringIO(content))
675 elif model_encoding == "mat":
676 return cobra.io.load_matlab_model(StringIO(content))
677 elif model_encoding == "yml":
678 return cobra.io.load_yaml_model(StringIO(content))
679 else:
680 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
681 elif '.gz' in ext_str:
682 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
683 if model_encoding == "json":
684 return cobra.io.load_json_model(gz_ref)
685 elif model_encoding == "mat":
686 return cobra.io.load_matlab_model(gz_ref)
687 elif model_encoding == "yml":
688 return cobra.io.load_yaml_model(gz_ref)
689 else:
690 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
691 elif '.bz2' in ext_str:
692 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
693 if model_encoding == "json":
694 return cobra.io.load_json_model(bz2_ref)
695 elif model_encoding == "mat":
696 return cobra.io.load_matlab_model(bz2_ref)
697 elif model_encoding == "yml":
698 return cobra.io.load_yaml_model(bz2_ref)
699 else:
700 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
701 else:
702 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
703
704 except Exception as e:
705 raise Exception(f"Error during model extraction: {str(e)}")
706
707
708
709 def __str__(self) -> str: return self.value
710
711