comparison COBRAxy/utils/general_utils.py @ 4:41f35c2f0c7b draft

Uploaded
author luca_milaz
date Wed, 18 Sep 2024 10:59:10 +0000
parents
children
comparison
equal deleted inserted replaced
3:1f3ac6fd9867 4:41f35c2f0c7b
1 import math
2 import re
3 import sys
4 import csv
5 import pickle
6 import lxml.etree as ET
7
8 from enum import Enum
9 from itertools import count
10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
11
12 import pandas as pd
13 import cobra
14
15 # FILES
16 class FileFormat(Enum):
17 """
18 Encodes possible file extensions to conditionally save data in a different format.
19 """
20 DAT = ("dat",) # this is how galaxy treats all your files!
21 CSV = ("csv",) # this is how most editable input data is written
22 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
23
24 SVG = ("svg",) # this is how most metabolic maps are written
25 PNG = ("png",) # this is a common output format for images (such as metabolic maps)
26 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
27
28 XML = ("xml",) # this is one main way cobra models appear in
29 JSON = ("json",) # this is the other
30
31 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
32 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The
33 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it
34 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however
35 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare.
36 @classmethod
37 def fromExt(cls, ext :str) -> "FileFormat":
38 """
39 Converts a file extension string to a FileFormat instance.
40
41 Args:
42 ext : The file extension as a string.
43
44 Returns:
45 FileFormat: The FileFormat instance corresponding to the file extension.
46 """
47 variantName = ext.upper()
48 if variantName in FileFormat.__members__: return FileFormat[variantName]
49
50 variantName = variantName.lower()
51 for member in cls:
52 if variantName in member.value: return member
53
54 raise ValueErr("ext", "a valid FileFormat file extension", ext)
55
56 def __str__(self) -> str:
57 """
58 (Private) converts to str representation. Good practice for usage with argparse.
59
60 Returns:
61 str : the string representation of the file extension.
62 """
63 return self.value[-1] #TODO: fix, it's the dumb pickle thing
64
65 class FilePath():
66 """
67 Represents a file path. View this as an attempt to standardize file-related operations by expecting
68 values of this type in any process requesting a file path.
69 """
70 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None:
71 """
72 (Private) Initializes an instance of FilePath.
73
74 Args:
75 path : the end of the path, containing the file name.
76 ext : the file's extension.
77 prefix : anything before path, if the last '/' isn't there it's added by the code.
78
79 Returns:
80 None : practically, a FilePath instance.
81 """
82 self.ext = ext
83 self.filePath = filePath
84
85 if prefix and prefix[-1] != '/': prefix += '/'
86 self.prefix = prefix
87
88 @classmethod
89 def fromStrPath(cls, path :str) -> "FilePath":
90 """
91 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
92
93 Args:
94 path : the string containing the path
95
96 Raises:
97 PathErr : if the provided string doesn't represent a valid path.
98
99 Returns:
100 FilePath : the constructed instance.
101 """
102 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should*
103 # always be correct paths and could be used as raw strings, however most if not all functions that work with
104 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this.
105 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only
106 # to call show() immediately to bring back the string and open the file it points to.
107 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES
108 # their correctness when modifying the UI and avoids the pointless back-and-forth.
109 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
110 if not result or not result["name"] or not result["ext"]:
111 raise PathErr(path, "cannot recognize folder structure or extension in path")
112
113 prefix = result["prefix"] if result["prefix"] else ""
114 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix)
115
116 def show(self) -> str:
117 """
118 Shows the path as a string.
119
120 Returns:
121 str : the path shown as a string.
122 """
123 return f"{self.prefix}{self.filePath}.{self.ext}"
124
125 def __str__(self) -> str: return self.show()
126
127 # ERRORS
128 def terminate(msg :str) -> None:
129 """
130 Terminate the execution of the script with an error message.
131
132 Args:
133 msg (str): The error message to be displayed.
134
135 Returns:
136 None
137 """
138 sys.exit(f"Execution aborted: {msg}\n")
139
140 def logWarning(msg :str, loggerPath :str) -> None:
141 """
142 Log a warning message to an output log file and print it to the console. The final period and a
143 newline is added by the function.
144
145 Args:
146 s (str): The warning message to be logged and printed.
147 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
148 immediately read back (beware relative expensive operation, log with caution).
149
150 Returns:
151 None
152 """
153 # building the path and then reading it immediately seems useless, but it's actually a way of
154 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
155 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
156 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
157 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
158 # the file only at the end of the program's execution.
159 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
160
161 class CustomErr(Exception):
162 """
163 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
164 """
165 __idGenerator = count()
166 errName = "Custom Error"
167 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
168 """
169 (Private) Initializes an instance of CustomErr.
170
171 Args:
172 msg (str): Error message to be displayed.
173 details (str): Informs the user more about the error encountered. Defaults to "".
174 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
175
176 Returns:
177 None : practically, a CustomErr instance.
178 """
179 self.msg = msg
180 self.details = details
181
182 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
183
184 def throw(self, loggerPath = "") -> None:
185 """
186 Raises the current CustomErr instance, logging a warning message before doing so.
187
188 Raises:
189 self: The current CustomErr instance.
190
191 Returns:
192 None
193 """
194 if loggerPath: logWarning(str(self), loggerPath)
195 raise self
196
197 def abort(self) -> None:
198 """
199 Aborts the execution of the script.
200
201 Returns:
202 None
203 """
204 terminate(str(self))
205
206 def __str__(self) -> str:
207 """
208 (Private) Returns a string representing the current CustomErr instance.
209
210 Returns:
211 str: A string representing the current CustomErr instance.
212 """
213 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
214
215 class ArgsErr(CustomErr):
216 """
217 CustomErr subclass for UI arguments errors.
218 """
219 errName = "Args Error"
220 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
221 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
222
223 class DataErr(CustomErr):
224 """
225 CustomErr subclass for data formatting errors.
226 """
227 errName = "Data Format Error"
228 def __init__(self, fileName :str, msg = "no further details provided") -> None:
229 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
230
231 class PathErr(CustomErr):
232 """
233 CustomErr subclass for filepath formatting errors.
234 """
235 errName = "Path Error"
236 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
237 super().__init__(f"path \"{path}\" is invalid", msg)
238
239 class ValueErr(CustomErr):
240 """
241 CustomErr subclass for any value error.
242 """
243 errName = "Value Error"
244 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
245 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
246
247 # RESULT
248 T = TypeVar('T')
249 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
250 class Result(Generic[T, E]):
251 class ResultErr(CustomErr):
252 """
253 CustomErr subclass for all Result errors.
254 """
255 errName = "Result Error"
256 def __init__(self, msg = "no further details provided") -> None:
257 super().__init__(msg)
258 """
259 Class to handle the result of an operation, with a value and a boolean flag to indicate
260 whether the operation was successful or not.
261 """
262 def __init__(self, value :Union[T, E], isOk :bool) -> None:
263 """
264 (Private) Initializes an instance of Result.
265
266 Args:
267 value (Union[T, E]): The value to be stored in the Result instance.
268 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
269
270 Returns:
271 None : practically, a Result instance.
272 """
273 self.isOk = isOk
274 self.isErr = not isOk
275 self.value = value
276
277 @classmethod
278 def Ok(cls, value :T) -> "Result":
279 """
280 Constructs a new Result instance with a successful operation.
281
282 Args:
283 value (T): The value to be stored in the Result instance, set as successful.
284
285 Returns:
286 Result: A new Result instance with a successful operation.
287 """
288 return Result(value, isOk = True)
289
290 @classmethod
291 def Err(cls, value :E) -> "Result":
292 """
293 Constructs a new Result instance with a failed operation.
294
295 Args:
296 value (E): The value to be stored in the Result instance, set as failed.
297
298 Returns:
299 Result: A new Result instance with a failed operation.
300 """
301 return Result(value, isOk = False)
302
303 def unwrap(self) -> T:
304 """
305 Unwraps the value of the Result instance, if the operation was successful.
306
307 Raises:
308 ResultErr: If the operation was not successful.
309
310 Returns:
311 T: The value of the Result instance, if the operation was successful.
312 """
313 if self.isOk: return self.value
314 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
315
316 def unwrapOr(self, default :T) -> T:
317 """
318 Unwraps the value of the Result instance, if the operation was successful, otherwise
319 it returns a default value.
320
321 Args:
322 default (T): The default value to be returned if the operation was not successful.
323
324 Returns:
325 T: The value of the Result instance, if the operation was successful,
326 otherwise the default value.
327 """
328 return self.value if self.isOk else default
329
330 def expect(self, err :"Result.ResultErr") -> T:
331 """
332 Expects that the value of the Result instance is successful, otherwise it raises an error.
333
334 Args:
335 err (Exception): The error to be raised if the operation was not successful.
336
337 Raises:
338 err: The error raised if the operation was not successful.
339
340 Returns:
341 T: The value of the Result instance, if the operation was successful.
342 """
343 if self.isOk: return self.value
344 raise err
345
346 U = TypeVar("U")
347 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
348 """
349 Maps the value of the current Result to whatever is returned by the mapper function.
350 If the Result contained an unsuccessful operation to begin with it remains unchanged
351 (a reference to the current instance is returned).
352 If the mapper function panics the returned result instance will be of the error kind.
353
354 Args:
355 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
356
357 Returns:
358 Result[U, E]: The result of the mapper operation applied to the Result value.
359 """
360 if self.isErr: return self
361 try: return Result.Ok(mapper(self.value))
362 except Exception as e: return Result.Err(e)
363
364 D = TypeVar("D", bound = "Result.ResultErr")
365 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
366 """
367 Maps the error of the current Result to whatever is returned by the mapper function.
368 If the Result contained a successful operation it remains unchanged
369 (a reference to the current instance is returned).
370 If the mapper function panics this method does as well.
371
372 Args:
373 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
374
375 Returns:
376 Result[U, E]: The result of the mapper operation applied to the Result error.
377 """
378 if self.isOk: return self
379 return Result.Err(mapper(self.value))
380
381 def __str__(self):
382 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
383
384 # FILES
385 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
386 """
387 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
388
389 Args:
390 path : the path to the dataset file.
391 datasetName : the name of the dataset.
392
393 Raises:
394 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
395 it has less than 2 columns.
396
397 Returns:
398 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
399 """
400 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
401 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
402 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
403 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
404 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
405 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
406 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
407 except:
408 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
409 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
410
411 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
412 return dataset
413
414 def readPickle(path :FilePath) -> Any:
415 """
416 Reads the contents of a .pickle file, which needs to exist at the given path.
417
418 Args:
419 path : the path to the .pickle file.
420
421 Returns:
422 Any : the data inside a pickle file, could be anything.
423 """
424 with open(path.show(), "rb") as fd: return pickle.load(fd)
425
426 def writePickle(path :FilePath, data :Any) -> None:
427 """
428 Saves any data in a .pickle file, created at the given path.
429
430 Args:
431 path : the path to the .pickle file.
432 data : the data to be written to the file.
433
434 Returns:
435 None
436 """
437 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
438
439 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
440 """
441 Reads the contents of a .csv file, which needs to exist at the given path.
442
443 Args:
444 path : the path to the .csv file.
445 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
446 skipHeader : whether the first row of the file is a header and should be skipped.
447
448 Returns:
449 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
450 """
451 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
452
453 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
454 """
455 Reads the contents of a .svg file, which needs to exist at the given path.
456
457 Args:
458 path : the path to the .svg file.
459
460 Raises:
461 DataErr : if the map is malformed.
462
463 Returns:
464 Any : the data inside a svg file, could be anything.
465 """
466 try: return ET.parse(path.show())
467 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
468 raise customErr if customErr else err
469
470 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
471 """
472 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
473
474 Args:
475 path : the path to the .svg file.
476 data : the data to be written to the file.
477
478 Returns:
479 None
480 """
481 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
482
483 # UI ARGUMENTS
484 class Bool:
485 def __init__(self, argName :str) -> None:
486 self.argName = argName
487
488 def __call__(self, s :str) -> bool: return self.check(s)
489
490 def check(self, s :str) -> bool:
491 s = s.lower()
492 if s == "true" : return True
493 if s == "false": return False
494 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
495
496 class Float:
497 def __init__(self, argName = "Dataset values, not an argument") -> None:
498 self.argName = argName
499
500 def __call__(self, s :str) -> float: return self.check(s)
501
502 def check(self, s :str) -> float:
503 try: return float(s)
504 except ValueError:
505 s = s.lower()
506 if s == "nan" or s == "none": return math.nan
507 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
508
509 # MODELS
510 OldRule = List[Union[str, "OldRule"]]
511 class Model(Enum):
512 """
513 Represents a metabolic model, either custom or locally supported. Custom models don't point
514 to valid file paths.
515 """
516
517 Recon = "Recon"
518 ENGRO2 = "ENGRO2"
519 ENGRO2_no_legend = "ENGRO2_no_legend"
520 HMRcore = "HMRcore"
521 HMRcore_no_legend = "HMRcore_no_legend"
522 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
523
524 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
525 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
526
527 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
528 """
529 Open "rules" file for this model.
530
531 Returns:
532 Dict[str, Dict[str, OldRule]] : the rules for this model.
533 """
534 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
535 self.__raiseMissingPathErr(path)
536 return readPickle(path)
537
538 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
539 """
540 Open "gene translator (old: gene_in_rule)" file for this model.
541
542 Returns:
543 Dict[str, Dict[str, str]] : the translator dict for this model.
544 """
545 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
546 self.__raiseMissingPathErr(path)
547 return readPickle(path)
548
549 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
550 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
551 self.__raiseMissingPathErr(path)
552 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
553
554 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
555 if(self is Model.Custom):
556 return self.load_custom_model(customPath, customExtension)
557 else:
558 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
559
560 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
561 ext = ext if ext else file_path.ext
562 try:
563 if ext is FileFormat.XML:
564 return cobra.io.read_sbml_model(file_path.show())
565
566 if ext is FileFormat.JSON:
567 return cobra.io.load_json_model(file_path.show())
568
569 except Exception as e: raise DataErr(file_path, e.__str__())
570 raise DataErr(file_path,
571 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")
572
573 def __str__(self) -> str: return self.value