comparison COBRAxy/utils/general_utils.py @ 240:63f5078627a9 draft

Uploaded
author francesco_lapi
date Mon, 13 Jan 2025 10:01:40 +0000
parents
children
comparison
equal deleted inserted replaced
239:7bd33d296319 240:63f5078627a9
1 import math
2 import re
3 import sys
4 import csv
5 import pickle
6 import lxml.etree as ET
7
8 from enum import Enum
9 from itertools import count
10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
11
12 import pandas as pd
13 import cobra
14
15 # FILES
16 class FileFormat(Enum):
17 """
18 Encodes possible file extensions to conditionally save data in a different format.
19 """
20 DAT = ("dat",) # this is how galaxy treats all your files!
21 CSV = ("csv",) # this is how most editable input data is written
22 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
23
24 SVG = ("svg",) # this is how most metabolic maps are written
25 PNG = ("png",) # this is a common output format for images (such as metabolic maps)
26 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
27
28 XML = ("xml",) # this is one main way cobra models appear in
29 JSON = ("json",) # this is the other
30
31 TXT = ("txt",) # this is how most output data is written
32
33 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
34 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The
35 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it
36 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however
37 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare.
38 @classmethod
39 def fromExt(cls, ext :str) -> "FileFormat":
40 """
41 Converts a file extension string to a FileFormat instance.
42
43 Args:
44 ext : The file extension as a string.
45
46 Returns:
47 FileFormat: The FileFormat instance corresponding to the file extension.
48 """
49 variantName = ext.upper()
50 if variantName in FileFormat.__members__: return FileFormat[variantName]
51
52 variantName = variantName.lower()
53 for member in cls:
54 if variantName in member.value: return member
55
56 raise ValueErr("ext", "a valid FileFormat file extension", ext)
57
58 def __str__(self) -> str:
59 """
60 (Private) converts to str representation. Good practice for usage with argparse.
61
62 Returns:
63 str : the string representation of the file extension.
64 """
65 return self.value[-1] #TODO: fix, it's the dumb pickle thing
66
67 class FilePath():
68 """
69 Represents a file path. View this as an attempt to standardize file-related operations by expecting
70 values of this type in any process requesting a file path.
71 """
72 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None:
73 """
74 (Private) Initializes an instance of FilePath.
75
76 Args:
77 path : the end of the path, containing the file name.
78 ext : the file's extension.
79 prefix : anything before path, if the last '/' isn't there it's added by the code.
80
81 Returns:
82 None : practically, a FilePath instance.
83 """
84 self.ext = ext
85 self.filePath = filePath
86
87 if prefix and prefix[-1] != '/': prefix += '/'
88 self.prefix = prefix
89
90 @classmethod
91 def fromStrPath(cls, path :str) -> "FilePath":
92 """
93 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
94
95 Args:
96 path : the string containing the path
97
98 Raises:
99 PathErr : if the provided string doesn't represent a valid path.
100
101 Returns:
102 FilePath : the constructed instance.
103 """
104 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should*
105 # always be correct paths and could be used as raw strings, however most if not all functions that work with
106 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this.
107 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only
108 # to call show() immediately to bring back the string and open the file it points to.
109 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES
110 # their correctness when modifying the UI and avoids the pointless back-and-forth.
111 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
112 if not result or not result["name"] or not result["ext"]:
113 raise PathErr(path, "cannot recognize folder structure or extension in path")
114
115 prefix = result["prefix"] if result["prefix"] else ""
116 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix)
117
118 def show(self) -> str:
119 """
120 Shows the path as a string.
121
122 Returns:
123 str : the path shown as a string.
124 """
125 return f"{self.prefix}{self.filePath}.{self.ext}"
126
127 def __str__(self) -> str: return self.show()
128
129 # ERRORS
130 def terminate(msg :str) -> None:
131 """
132 Terminate the execution of the script with an error message.
133
134 Args:
135 msg (str): The error message to be displayed.
136
137 Returns:
138 None
139 """
140 sys.exit(f"Execution aborted: {msg}\n")
141
142 def logWarning(msg :str, loggerPath :str) -> None:
143 """
144 Log a warning message to an output log file and print it to the console. The final period and a
145 newline is added by the function.
146
147 Args:
148 s (str): The warning message to be logged and printed.
149 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
150 immediately read back (beware relative expensive operation, log with caution).
151
152 Returns:
153 None
154 """
155 # building the path and then reading it immediately seems useless, but it's actually a way of
156 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
157 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
158 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
159 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
160 # the file only at the end of the program's execution.
161 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
162
163 class CustomErr(Exception):
164 """
165 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
166 """
167 __idGenerator = count()
168 errName = "Custom Error"
169 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
170 """
171 (Private) Initializes an instance of CustomErr.
172
173 Args:
174 msg (str): Error message to be displayed.
175 details (str): Informs the user more about the error encountered. Defaults to "".
176 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
177
178 Returns:
179 None : practically, a CustomErr instance.
180 """
181 self.msg = msg
182 self.details = details
183
184 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
185
186 def throw(self, loggerPath = "") -> None:
187 """
188 Raises the current CustomErr instance, logging a warning message before doing so.
189
190 Raises:
191 self: The current CustomErr instance.
192
193 Returns:
194 None
195 """
196 if loggerPath: logWarning(str(self), loggerPath)
197 raise self
198
199 def abort(self) -> None:
200 """
201 Aborts the execution of the script.
202
203 Returns:
204 None
205 """
206 terminate(str(self))
207
208 def __str__(self) -> str:
209 """
210 (Private) Returns a string representing the current CustomErr instance.
211
212 Returns:
213 str: A string representing the current CustomErr instance.
214 """
215 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
216
217 class ArgsErr(CustomErr):
218 """
219 CustomErr subclass for UI arguments errors.
220 """
221 errName = "Args Error"
222 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
223 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
224
225 class DataErr(CustomErr):
226 """
227 CustomErr subclass for data formatting errors.
228 """
229 errName = "Data Format Error"
230 def __init__(self, fileName :str, msg = "no further details provided") -> None:
231 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
232
233 class PathErr(CustomErr):
234 """
235 CustomErr subclass for filepath formatting errors.
236 """
237 errName = "Path Error"
238 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
239 super().__init__(f"path \"{path}\" is invalid", msg)
240
241 class ValueErr(CustomErr):
242 """
243 CustomErr subclass for any value error.
244 """
245 errName = "Value Error"
246 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
247 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
248
249 # RESULT
250 T = TypeVar('T')
251 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
252 class Result(Generic[T, E]):
253 class ResultErr(CustomErr):
254 """
255 CustomErr subclass for all Result errors.
256 """
257 errName = "Result Error"
258 def __init__(self, msg = "no further details provided") -> None:
259 super().__init__(msg)
260 """
261 Class to handle the result of an operation, with a value and a boolean flag to indicate
262 whether the operation was successful or not.
263 """
264 def __init__(self, value :Union[T, E], isOk :bool) -> None:
265 """
266 (Private) Initializes an instance of Result.
267
268 Args:
269 value (Union[T, E]): The value to be stored in the Result instance.
270 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
271
272 Returns:
273 None : practically, a Result instance.
274 """
275 self.isOk = isOk
276 self.isErr = not isOk
277 self.value = value
278
279 @classmethod
280 def Ok(cls, value :T) -> "Result":
281 """
282 Constructs a new Result instance with a successful operation.
283
284 Args:
285 value (T): The value to be stored in the Result instance, set as successful.
286
287 Returns:
288 Result: A new Result instance with a successful operation.
289 """
290 return Result(value, isOk = True)
291
292 @classmethod
293 def Err(cls, value :E) -> "Result":
294 """
295 Constructs a new Result instance with a failed operation.
296
297 Args:
298 value (E): The value to be stored in the Result instance, set as failed.
299
300 Returns:
301 Result: A new Result instance with a failed operation.
302 """
303 return Result(value, isOk = False)
304
305 def unwrap(self) -> T:
306 """
307 Unwraps the value of the Result instance, if the operation was successful.
308
309 Raises:
310 ResultErr: If the operation was not successful.
311
312 Returns:
313 T: The value of the Result instance, if the operation was successful.
314 """
315 if self.isOk: return self.value
316 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
317
318 def unwrapOr(self, default :T) -> T:
319 """
320 Unwraps the value of the Result instance, if the operation was successful, otherwise
321 it returns a default value.
322
323 Args:
324 default (T): The default value to be returned if the operation was not successful.
325
326 Returns:
327 T: The value of the Result instance, if the operation was successful,
328 otherwise the default value.
329 """
330 return self.value if self.isOk else default
331
332 def expect(self, err :"Result.ResultErr") -> T:
333 """
334 Expects that the value of the Result instance is successful, otherwise it raises an error.
335
336 Args:
337 err (Exception): The error to be raised if the operation was not successful.
338
339 Raises:
340 err: The error raised if the operation was not successful.
341
342 Returns:
343 T: The value of the Result instance, if the operation was successful.
344 """
345 if self.isOk: return self.value
346 raise err
347
348 U = TypeVar("U")
349 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
350 """
351 Maps the value of the current Result to whatever is returned by the mapper function.
352 If the Result contained an unsuccessful operation to begin with it remains unchanged
353 (a reference to the current instance is returned).
354 If the mapper function panics the returned result instance will be of the error kind.
355
356 Args:
357 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
358
359 Returns:
360 Result[U, E]: The result of the mapper operation applied to the Result value.
361 """
362 if self.isErr: return self
363 try: return Result.Ok(mapper(self.value))
364 except Exception as e: return Result.Err(e)
365
366 D = TypeVar("D", bound = "Result.ResultErr")
367 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
368 """
369 Maps the error of the current Result to whatever is returned by the mapper function.
370 If the Result contained a successful operation it remains unchanged
371 (a reference to the current instance is returned).
372 If the mapper function panics this method does as well.
373
374 Args:
375 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
376
377 Returns:
378 Result[U, E]: The result of the mapper operation applied to the Result error.
379 """
380 if self.isOk: return self
381 return Result.Err(mapper(self.value))
382
383 def __str__(self):
384 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
385
386 # FILES
387 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
388 """
389 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
390
391 Args:
392 path : the path to the dataset file.
393 datasetName : the name of the dataset.
394
395 Raises:
396 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
397 it has less than 2 columns.
398
399 Returns:
400 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
401 """
402 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
403 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
404 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
405 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
406 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
407 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
408 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
409 except:
410 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
411 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
412
413 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
414 return dataset
415
416 def readPickle(path :FilePath) -> Any:
417 """
418 Reads the contents of a .pickle file, which needs to exist at the given path.
419
420 Args:
421 path : the path to the .pickle file.
422
423 Returns:
424 Any : the data inside a pickle file, could be anything.
425 """
426 with open(path.show(), "rb") as fd: return pickle.load(fd)
427
428 def writePickle(path :FilePath, data :Any) -> None:
429 """
430 Saves any data in a .pickle file, created at the given path.
431
432 Args:
433 path : the path to the .pickle file.
434 data : the data to be written to the file.
435
436 Returns:
437 None
438 """
439 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
440
441 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
442 """
443 Reads the contents of a .csv file, which needs to exist at the given path.
444
445 Args:
446 path : the path to the .csv file.
447 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
448 skipHeader : whether the first row of the file is a header and should be skipped.
449
450 Returns:
451 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
452 """
453 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
454
455 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
456 """
457 Reads the contents of a .svg file, which needs to exist at the given path.
458
459 Args:
460 path : the path to the .svg file.
461
462 Raises:
463 DataErr : if the map is malformed.
464
465 Returns:
466 Any : the data inside a svg file, could be anything.
467 """
468 try: return ET.parse(path.show())
469 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
470 raise customErr if customErr else err
471
472 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
473 """
474 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
475
476 Args:
477 path : the path to the .svg file.
478 data : the data to be written to the file.
479
480 Returns:
481 None
482 """
483 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
484
485 # UI ARGUMENTS
486 class Bool:
487 def __init__(self, argName :str) -> None:
488 self.argName = argName
489
490 def __call__(self, s :str) -> bool: return self.check(s)
491
492 def check(self, s :str) -> bool:
493 s = s.lower()
494 if s == "true" : return True
495 if s == "false": return False
496 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
497
498 class Float:
499 def __init__(self, argName = "Dataset values, not an argument") -> None:
500 self.argName = argName
501
502 def __call__(self, s :str) -> float: return self.check(s)
503
504 def check(self, s :str) -> float:
505 try: return float(s)
506 except ValueError:
507 s = s.lower()
508 if s == "nan" or s == "none": return math.nan
509 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
510
511 # MODELS
512 OldRule = List[Union[str, "OldRule"]]
513 class Model(Enum):
514 """
515 Represents a metabolic model, either custom or locally supported. Custom models don't point
516 to valid file paths.
517 """
518
519 Recon = "Recon"
520 ENGRO2 = "ENGRO2"
521 ENGRO2_no_legend = "ENGRO2_no_legend"
522 HMRcore = "HMRcore"
523 HMRcore_no_legend = "HMRcore_no_legend"
524 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
525
526 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
527 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
528
529 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
530 """
531 Open "rules" file for this model.
532
533 Returns:
534 Dict[str, Dict[str, OldRule]] : the rules for this model.
535 """
536 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
537 self.__raiseMissingPathErr(path)
538 return readPickle(path)
539
540 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
541 """
542 Open "gene translator (old: gene_in_rule)" file for this model.
543
544 Returns:
545 Dict[str, Dict[str, str]] : the translator dict for this model.
546 """
547 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
548 self.__raiseMissingPathErr(path)
549 return readPickle(path)
550
551 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
552 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
553 self.__raiseMissingPathErr(path)
554 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
555
556 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
557 if(self is Model.Custom):
558 return self.load_custom_model(customPath, customExtension)
559 else:
560 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
561
562 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
563 ext = ext if ext else file_path.ext
564 try:
565 if ext is FileFormat.XML:
566 return cobra.io.read_sbml_model(file_path.show())
567
568 if ext is FileFormat.JSON:
569 return cobra.io.load_json_model(file_path.show())
570
571 except Exception as e: raise DataErr(file_path, e.__str__())
572 raise DataErr(file_path,
573 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")
574
575 def __str__(self) -> str: return self.value