comparison utils/general_utils.py @ 283:813439d60f85 draft

Uploaded
author luca_milaz
date Mon, 08 Jul 2024 22:18:11 +0000
parents
children
comparison
equal deleted inserted replaced
282:d385c4df70c3 283:813439d60f85
1 import math
2 import re
3 import sys
4 import csv
5 import pickle
6 import lxml.etree as ET
7
8 from enum import Enum
9 from itertools import count
10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
11
12 import pandas as pd
13
14 # FILES
15 class FileFormat(Enum):
16 """
17 Encodes possible file extensions to conditionally save data in a different format.
18 """
19 DAT = ("dat",) # this is how galaxy treats all your files!
20 CSV = ("csv",) # this is how most editable input data is written
21 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
22
23 SVG = ("svg",) # this is how most metabolic maps are written
24 PNG = ("png",) # this is a common output format for images (such as metabolic maps)
25 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
26
27 XML = ("xml",) # this is one main way cobra models appear in
28 JSON = ("json",) # this is the other
29
30 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
31 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The
32 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it
33 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however
34 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare.
35 @classmethod
36 def fromExt(cls, ext :str) -> "FileFormat":
37 """
38 Converts a file extension string to a FileFormat instance.
39
40 Args:
41 ext : The file extension as a string.
42
43 Returns:
44 FileFormat: The FileFormat instance corresponding to the file extension.
45 """
46 variantName = ext.upper()
47 if variantName in FileFormat.__members__: return FileFormat[variantName]
48
49 variantName = variantName.lower()
50 for member in cls:
51 if variantName in member.value: return member
52
53 raise ValueErr("ext", "a valid FileFormat file extension", ext)
54
55 def __str__(self) -> str:
56 """
57 (Private) converts to str representation. Good practice for usage with argparse.
58
59 Returns:
60 str : the string representation of the file extension.
61 """
62 return self.value[-1] #TODO: fix, it's the dumb pickle thing
63
64 class FilePath():
65 """
66 Represents a file path. View this as an attempt to standardize file-related operations by expecting
67 values of this type in any process requesting a file path.
68 """
69 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None:
70 """
71 (Private) Initializes an instance of FilePath.
72
73 Args:
74 path : the end of the path, containing the file name.
75 ext : the file's extension.
76 prefix : anything before path, if the last '/' isn't there it's added by the code.
77
78 Returns:
79 None : practically, a FilePath instance.
80 """
81 self.ext = ext
82 self.filePath = filePath
83
84 if prefix and prefix[-1] != '/': prefix += '/'
85 self.prefix = prefix
86
87 @classmethod
88 def fromStrPath(cls, path :str) -> "FilePath":
89 """
90 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
91
92 Args:
93 path : the string containing the path
94
95 Raises:
96 PathErr : if the provided string doesn't represent a valid path.
97
98 Returns:
99 FilePath : the constructed instance.
100 """
101 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should*
102 # always be correct paths and could be used as raw strings, however most if not all functions that work with
103 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this.
104 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only
105 # to call show() immediately to bring back the string and open the file it points to.
106 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES
107 # their correctness when modifying the UI and avoids the pointless back-and-forth.
108 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
109 if not result or not result["name"] or not result["ext"]:
110 raise PathErr(path, "cannot recognize folder structure or extension in path")
111
112 prefix = result["prefix"] if result["prefix"] else ""
113 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix)
114
115 def show(self) -> str:
116 """
117 Shows the path as a string.
118
119 Returns:
120 str : the path shown as a string.
121 """
122 return f"{self.prefix}{self.filePath}.{self.ext}"
123
124 def __str__(self) -> str: return self.show()
125
126 # ERRORS
127 def terminate(msg :str) -> None:
128 """
129 Terminate the execution of the script with an error message.
130
131 Args:
132 msg (str): The error message to be displayed.
133
134 Returns:
135 None
136 """
137 sys.exit(f"Execution aborted: {msg}\n")
138
139 def logWarning(msg :str, loggerPath :str) -> None:
140 """
141 Log a warning message to an output log file and print it to the console. The final period and a
142 newline is added by the function.
143
144 Args:
145 s (str): The warning message to be logged and printed.
146 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
147 immediately read back (beware relative expensive operation, log with caution).
148
149 Returns:
150 None
151 """
152 # building the path and then reading it immediately seems useless, but it's actually a way of
153 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
154 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
155 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
156 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
157 # the file only at the end of the program's execution.
158 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
159
160 class CustomErr(Exception):
161 """
162 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
163 """
164 __idGenerator = count()
165 errName = "Custom Error"
166 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
167 """
168 (Private) Initializes an instance of CustomErr.
169
170 Args:
171 msg (str): Error message to be displayed.
172 details (str): Informs the user more about the error encountered. Defaults to "".
173 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
174
175 Returns:
176 None : practically, a CustomErr instance.
177 """
178 self.msg = msg
179 self.details = details
180
181 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
182
183 def throw(self, loggerPath = "") -> None:
184 """
185 Raises the current CustomErr instance, logging a warning message before doing so.
186
187 Raises:
188 self: The current CustomErr instance.
189
190 Returns:
191 None
192 """
193 if loggerPath: logWarning(str(self), loggerPath)
194 raise self
195
196 def abort(self) -> None:
197 """
198 Aborts the execution of the script.
199
200 Returns:
201 None
202 """
203 terminate(str(self))
204
205 def __str__(self) -> str:
206 """
207 (Private) Returns a string representing the current CustomErr instance.
208
209 Returns:
210 str: A string representing the current CustomErr instance.
211 """
212 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
213
214 class ArgsErr(CustomErr):
215 """
216 CustomErr subclass for UI arguments errors.
217 """
218 errName = "Args Error"
219 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
220 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
221
222 class DataErr(CustomErr):
223 """
224 CustomErr subclass for data formatting errors.
225 """
226 errName = "Data Format Error"
227 def __init__(self, fileName :str, msg = "no further details provided") -> None:
228 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
229
230 class PathErr(CustomErr):
231 """
232 CustomErr subclass for filepath formatting errors.
233 """
234 errName = "Path Error"
235 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
236 super().__init__(f"path \"{path}\" is invalid", msg)
237
238 class ValueErr(CustomErr):
239 """
240 CustomErr subclass for any value error.
241 """
242 errName = "Value Error"
243 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
244 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
245
246 # RESULT
247 T = TypeVar('T')
248 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
249 class Result(Generic[T, E]):
250 class ResultErr(CustomErr):
251 """
252 CustomErr subclass for all Result errors.
253 """
254 errName = "Result Error"
255 def __init__(self, msg = "no further details provided") -> None:
256 super().__init__(msg)
257 """
258 Class to handle the result of an operation, with a value and a boolean flag to indicate
259 whether the operation was successful or not.
260 """
261 def __init__(self, value :Union[T, E], isOk :bool) -> None:
262 """
263 (Private) Initializes an instance of Result.
264
265 Args:
266 value (Union[T, E]): The value to be stored in the Result instance.
267 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
268
269 Returns:
270 None : practically, a Result instance.
271 """
272 self.isOk = isOk
273 self.isErr = not isOk
274 self.value = value
275
276 @classmethod
277 def Ok(cls, value :T) -> "Result":
278 """
279 Constructs a new Result instance with a successful operation.
280
281 Args:
282 value (T): The value to be stored in the Result instance, set as successful.
283
284 Returns:
285 Result: A new Result instance with a successful operation.
286 """
287 return Result(value, isOk = True)
288
289 @classmethod
290 def Err(cls, value :E) -> "Result":
291 """
292 Constructs a new Result instance with a failed operation.
293
294 Args:
295 value (E): The value to be stored in the Result instance, set as failed.
296
297 Returns:
298 Result: A new Result instance with a failed operation.
299 """
300 return Result(value, isOk = False)
301
302 def unwrap(self) -> T:
303 """
304 Unwraps the value of the Result instance, if the operation was successful.
305
306 Raises:
307 ResultErr: If the operation was not successful.
308
309 Returns:
310 T: The value of the Result instance, if the operation was successful.
311 """
312 if self.isOk: return self.value
313 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
314
315 def unwrapOr(self, default :T) -> T:
316 """
317 Unwraps the value of the Result instance, if the operation was successful, otherwise
318 it returns a default value.
319
320 Args:
321 default (T): The default value to be returned if the operation was not successful.
322
323 Returns:
324 T: The value of the Result instance, if the operation was successful,
325 otherwise the default value.
326 """
327 return self.value if self.isOk else default
328
329 def expect(self, err :"Result.ResultErr") -> T:
330 """
331 Expects that the value of the Result instance is successful, otherwise it raises an error.
332
333 Args:
334 err (Exception): The error to be raised if the operation was not successful.
335
336 Raises:
337 err: The error raised if the operation was not successful.
338
339 Returns:
340 T: The value of the Result instance, if the operation was successful.
341 """
342 if self.isOk: return self.value
343 raise err
344
345 U = TypeVar("U")
346 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
347 """
348 Maps the value of the current Result to whatever is returned by the mapper function.
349 If the Result contained an unsuccessful operation to begin with it remains unchanged
350 (a reference to the current instance is returned).
351 If the mapper function panics the returned result instance will be of the error kind.
352
353 Args:
354 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
355
356 Returns:
357 Result[U, E]: The result of the mapper operation applied to the Result value.
358 """
359 if self.isErr: return self
360 try: return Result.Ok(mapper(self.value))
361 except Exception as e: return Result.Err(e)
362
363 D = TypeVar("D", bound = "Result.ResultErr")
364 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
365 """
366 Maps the error of the current Result to whatever is returned by the mapper function.
367 If the Result contained a successful operation it remains unchanged
368 (a reference to the current instance is returned).
369 If the mapper function panics this method does as well.
370
371 Args:
372 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
373
374 Returns:
375 Result[U, E]: The result of the mapper operation applied to the Result error.
376 """
377 if self.isOk: return self
378 return Result.Err(mapper(self.value))
379
380 def __str__(self):
381 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
382
383 # FILES
384 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
385 """
386 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
387
388 Args:
389 path : the path to the dataset file.
390 datasetName : the name of the dataset.
391
392 Raises:
393 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
394 it has less than 2 columns.
395
396 Returns:
397 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
398 """
399 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
400 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
401 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
402 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
403 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
404 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
405 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
406 except:
407 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
408 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
409
410 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
411 return dataset
412
413 def readPickle(path :FilePath) -> Any:
414 """
415 Reads the contents of a .pickle file, which needs to exist at the given path.
416
417 Args:
418 path : the path to the .pickle file.
419
420 Returns:
421 Any : the data inside a pickle file, could be anything.
422 """
423 with open(path.show(), "rb") as fd: return pickle.load(fd)
424
425 def writePickle(path :FilePath, data :Any) -> None:
426 """
427 Saves any data in a .pickle file, created at the given path.
428
429 Args:
430 path : the path to the .pickle file.
431 data : the data to be written to the file.
432
433 Returns:
434 None
435 """
436 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
437
438 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
439 """
440 Reads the contents of a .csv file, which needs to exist at the given path.
441
442 Args:
443 path : the path to the .csv file.
444 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
445 skipHeader : whether the first row of the file is a header and should be skipped.
446
447 Returns:
448 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
449 """
450 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
451
452 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
453 """
454 Reads the contents of a .svg file, which needs to exist at the given path.
455
456 Args:
457 path : the path to the .svg file.
458
459 Raises:
460 DataErr : if the map is malformed.
461
462 Returns:
463 Any : the data inside a svg file, could be anything.
464 """
465 try: return ET.parse(path.show())
466 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
467 raise customErr if customErr else err
468
469 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
470 """
471 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
472
473 Args:
474 path : the path to the .svg file.
475 data : the data to be written to the file.
476
477 Returns:
478 None
479 """
480 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
481
482 # UI ARGUMENTS
483 class Bool:
484 def __init__(self, argName :str) -> None:
485 self.argName = argName
486
487 def __call__(self, s :str) -> bool: return self.check(s)
488
489 def check(self, s :str) -> bool:
490 s = s.lower()
491 if s == "true" : return True
492 if s == "false": return False
493 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
494
495 class Float:
496 def __init__(self, argName = "Dataset values, not an argument") -> None:
497 self.argName = argName
498
499 def __call__(self, s :str) -> float: return self.check(s)
500
501 def check(self, s :str) -> float:
502 try: return float(s)
503 except ValueError:
504 s = s.lower()
505 if s == "nan" or s == "none": return math.nan
506 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
507
508 # MODELS
509 OldRule = List[Union[str, "OldRule"]]
510 class Model(Enum):
511 """
512 Represents a metabolic model, either custom or locally supported. Custom models don't point
513 to valid file paths.
514 """
515
516 Recon = "Recon"
517 ENGRO2 = "ENGRO2"
518 HMRcore = "HMRcore"
519 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
520
521 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
522 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
523
524 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
525 """
526 Open "rules" file for this model.
527
528 Returns:
529 Dict[str, Dict[str, OldRule]] : the rules for this model.
530 """
531 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
532 self.__raiseMissingPathErr(path)
533 return readPickle(path)
534
535 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
536 """
537 Open "gene translator (old: gene_in_rule)" file for this model.
538
539 Returns:
540 Dict[str, Dict[str, str]] : the translator dict for this model.
541 """
542 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
543 self.__raiseMissingPathErr(path)
544 return readPickle(path)
545
546 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
547 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
548 self.__raiseMissingPathErr(path)
549 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
550
551 def __str__(self) -> str: return self.value