Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 4:41f35c2f0c7b draft
Uploaded
author | luca_milaz |
---|---|
date | Wed, 18 Sep 2024 10:59:10 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
3:1f3ac6fd9867 | 4:41f35c2f0c7b |
---|---|
1 import math | |
2 import re | |
3 import sys | |
4 import csv | |
5 import pickle | |
6 import lxml.etree as ET | |
7 | |
8 from enum import Enum | |
9 from itertools import count | |
10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union | |
11 | |
12 import pandas as pd | |
13 import cobra | |
14 | |
15 # FILES | |
16 class FileFormat(Enum): | |
17 """ | |
18 Encodes possible file extensions to conditionally save data in a different format. | |
19 """ | |
20 DAT = ("dat",) # this is how galaxy treats all your files! | |
21 CSV = ("csv",) # this is how most editable input data is written | |
22 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | |
23 | |
24 SVG = ("svg",) # this is how most metabolic maps are written | |
25 PNG = ("png",) # this is a common output format for images (such as metabolic maps) | |
26 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. | |
27 | |
28 XML = ("xml",) # this is one main way cobra models appear in | |
29 JSON = ("json",) # this is the other | |
30 | |
31 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | |
32 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The | |
33 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it | |
34 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however | |
35 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare. | |
36 @classmethod | |
37 def fromExt(cls, ext :str) -> "FileFormat": | |
38 """ | |
39 Converts a file extension string to a FileFormat instance. | |
40 | |
41 Args: | |
42 ext : The file extension as a string. | |
43 | |
44 Returns: | |
45 FileFormat: The FileFormat instance corresponding to the file extension. | |
46 """ | |
47 variantName = ext.upper() | |
48 if variantName in FileFormat.__members__: return FileFormat[variantName] | |
49 | |
50 variantName = variantName.lower() | |
51 for member in cls: | |
52 if variantName in member.value: return member | |
53 | |
54 raise ValueErr("ext", "a valid FileFormat file extension", ext) | |
55 | |
56 def __str__(self) -> str: | |
57 """ | |
58 (Private) converts to str representation. Good practice for usage with argparse. | |
59 | |
60 Returns: | |
61 str : the string representation of the file extension. | |
62 """ | |
63 return self.value[-1] #TODO: fix, it's the dumb pickle thing | |
64 | |
65 class FilePath(): | |
66 """ | |
67 Represents a file path. View this as an attempt to standardize file-related operations by expecting | |
68 values of this type in any process requesting a file path. | |
69 """ | |
70 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None: | |
71 """ | |
72 (Private) Initializes an instance of FilePath. | |
73 | |
74 Args: | |
75 path : the end of the path, containing the file name. | |
76 ext : the file's extension. | |
77 prefix : anything before path, if the last '/' isn't there it's added by the code. | |
78 | |
79 Returns: | |
80 None : practically, a FilePath instance. | |
81 """ | |
82 self.ext = ext | |
83 self.filePath = filePath | |
84 | |
85 if prefix and prefix[-1] != '/': prefix += '/' | |
86 self.prefix = prefix | |
87 | |
88 @classmethod | |
89 def fromStrPath(cls, path :str) -> "FilePath": | |
90 """ | |
91 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | |
92 | |
93 Args: | |
94 path : the string containing the path | |
95 | |
96 Raises: | |
97 PathErr : if the provided string doesn't represent a valid path. | |
98 | |
99 Returns: | |
100 FilePath : the constructed instance. | |
101 """ | |
102 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should* | |
103 # always be correct paths and could be used as raw strings, however most if not all functions that work with | |
104 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this. | |
105 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only | |
106 # to call show() immediately to bring back the string and open the file it points to. | |
107 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES | |
108 # their correctness when modifying the UI and avoids the pointless back-and-forth. | |
109 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | |
110 if not result or not result["name"] or not result["ext"]: | |
111 raise PathErr(path, "cannot recognize folder structure or extension in path") | |
112 | |
113 prefix = result["prefix"] if result["prefix"] else "" | |
114 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix) | |
115 | |
116 def show(self) -> str: | |
117 """ | |
118 Shows the path as a string. | |
119 | |
120 Returns: | |
121 str : the path shown as a string. | |
122 """ | |
123 return f"{self.prefix}{self.filePath}.{self.ext}" | |
124 | |
125 def __str__(self) -> str: return self.show() | |
126 | |
127 # ERRORS | |
128 def terminate(msg :str) -> None: | |
129 """ | |
130 Terminate the execution of the script with an error message. | |
131 | |
132 Args: | |
133 msg (str): The error message to be displayed. | |
134 | |
135 Returns: | |
136 None | |
137 """ | |
138 sys.exit(f"Execution aborted: {msg}\n") | |
139 | |
140 def logWarning(msg :str, loggerPath :str) -> None: | |
141 """ | |
142 Log a warning message to an output log file and print it to the console. The final period and a | |
143 newline is added by the function. | |
144 | |
145 Args: | |
146 s (str): The warning message to be logged and printed. | |
147 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | |
148 immediately read back (beware relative expensive operation, log with caution). | |
149 | |
150 Returns: | |
151 None | |
152 """ | |
153 # building the path and then reading it immediately seems useless, but it's actually a way of | |
154 # validating that reduces repetition on the caller's side. Besides, logging a message by writing | |
155 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | |
156 # mindlessly logging whenever something comes up, log at the very end and tell the user everything | |
157 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | |
158 # the file only at the end of the program's execution. | |
159 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | |
160 | |
161 class CustomErr(Exception): | |
162 """ | |
163 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | |
164 """ | |
165 __idGenerator = count() | |
166 errName = "Custom Error" | |
167 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | |
168 """ | |
169 (Private) Initializes an instance of CustomErr. | |
170 | |
171 Args: | |
172 msg (str): Error message to be displayed. | |
173 details (str): Informs the user more about the error encountered. Defaults to "". | |
174 explicitErrCode (int): Explicit error code to be used. Defaults to -1. | |
175 | |
176 Returns: | |
177 None : practically, a CustomErr instance. | |
178 """ | |
179 self.msg = msg | |
180 self.details = details | |
181 | |
182 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | |
183 | |
184 def throw(self, loggerPath = "") -> None: | |
185 """ | |
186 Raises the current CustomErr instance, logging a warning message before doing so. | |
187 | |
188 Raises: | |
189 self: The current CustomErr instance. | |
190 | |
191 Returns: | |
192 None | |
193 """ | |
194 if loggerPath: logWarning(str(self), loggerPath) | |
195 raise self | |
196 | |
197 def abort(self) -> None: | |
198 """ | |
199 Aborts the execution of the script. | |
200 | |
201 Returns: | |
202 None | |
203 """ | |
204 terminate(str(self)) | |
205 | |
206 def __str__(self) -> str: | |
207 """ | |
208 (Private) Returns a string representing the current CustomErr instance. | |
209 | |
210 Returns: | |
211 str: A string representing the current CustomErr instance. | |
212 """ | |
213 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | |
214 | |
215 class ArgsErr(CustomErr): | |
216 """ | |
217 CustomErr subclass for UI arguments errors. | |
218 """ | |
219 errName = "Args Error" | |
220 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
221 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | |
222 | |
223 class DataErr(CustomErr): | |
224 """ | |
225 CustomErr subclass for data formatting errors. | |
226 """ | |
227 errName = "Data Format Error" | |
228 def __init__(self, fileName :str, msg = "no further details provided") -> None: | |
229 super().__init__(f"file \"{fileName}\" contains malformed data", msg) | |
230 | |
231 class PathErr(CustomErr): | |
232 """ | |
233 CustomErr subclass for filepath formatting errors. | |
234 """ | |
235 errName = "Path Error" | |
236 def __init__(self, path :FilePath, msg = "no further details provided") -> None: | |
237 super().__init__(f"path \"{path}\" is invalid", msg) | |
238 | |
239 class ValueErr(CustomErr): | |
240 """ | |
241 CustomErr subclass for any value error. | |
242 """ | |
243 errName = "Value Error" | |
244 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
245 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | |
246 | |
247 # RESULT | |
248 T = TypeVar('T') | |
249 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | |
250 class Result(Generic[T, E]): | |
251 class ResultErr(CustomErr): | |
252 """ | |
253 CustomErr subclass for all Result errors. | |
254 """ | |
255 errName = "Result Error" | |
256 def __init__(self, msg = "no further details provided") -> None: | |
257 super().__init__(msg) | |
258 """ | |
259 Class to handle the result of an operation, with a value and a boolean flag to indicate | |
260 whether the operation was successful or not. | |
261 """ | |
262 def __init__(self, value :Union[T, E], isOk :bool) -> None: | |
263 """ | |
264 (Private) Initializes an instance of Result. | |
265 | |
266 Args: | |
267 value (Union[T, E]): The value to be stored in the Result instance. | |
268 isOk (bool): A boolean flag to indicate whether the operation was successful or not. | |
269 | |
270 Returns: | |
271 None : practically, a Result instance. | |
272 """ | |
273 self.isOk = isOk | |
274 self.isErr = not isOk | |
275 self.value = value | |
276 | |
277 @classmethod | |
278 def Ok(cls, value :T) -> "Result": | |
279 """ | |
280 Constructs a new Result instance with a successful operation. | |
281 | |
282 Args: | |
283 value (T): The value to be stored in the Result instance, set as successful. | |
284 | |
285 Returns: | |
286 Result: A new Result instance with a successful operation. | |
287 """ | |
288 return Result(value, isOk = True) | |
289 | |
290 @classmethod | |
291 def Err(cls, value :E) -> "Result": | |
292 """ | |
293 Constructs a new Result instance with a failed operation. | |
294 | |
295 Args: | |
296 value (E): The value to be stored in the Result instance, set as failed. | |
297 | |
298 Returns: | |
299 Result: A new Result instance with a failed operation. | |
300 """ | |
301 return Result(value, isOk = False) | |
302 | |
303 def unwrap(self) -> T: | |
304 """ | |
305 Unwraps the value of the Result instance, if the operation was successful. | |
306 | |
307 Raises: | |
308 ResultErr: If the operation was not successful. | |
309 | |
310 Returns: | |
311 T: The value of the Result instance, if the operation was successful. | |
312 """ | |
313 if self.isOk: return self.value | |
314 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | |
315 | |
316 def unwrapOr(self, default :T) -> T: | |
317 """ | |
318 Unwraps the value of the Result instance, if the operation was successful, otherwise | |
319 it returns a default value. | |
320 | |
321 Args: | |
322 default (T): The default value to be returned if the operation was not successful. | |
323 | |
324 Returns: | |
325 T: The value of the Result instance, if the operation was successful, | |
326 otherwise the default value. | |
327 """ | |
328 return self.value if self.isOk else default | |
329 | |
330 def expect(self, err :"Result.ResultErr") -> T: | |
331 """ | |
332 Expects that the value of the Result instance is successful, otherwise it raises an error. | |
333 | |
334 Args: | |
335 err (Exception): The error to be raised if the operation was not successful. | |
336 | |
337 Raises: | |
338 err: The error raised if the operation was not successful. | |
339 | |
340 Returns: | |
341 T: The value of the Result instance, if the operation was successful. | |
342 """ | |
343 if self.isOk: return self.value | |
344 raise err | |
345 | |
346 U = TypeVar("U") | |
347 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | |
348 """ | |
349 Maps the value of the current Result to whatever is returned by the mapper function. | |
350 If the Result contained an unsuccessful operation to begin with it remains unchanged | |
351 (a reference to the current instance is returned). | |
352 If the mapper function panics the returned result instance will be of the error kind. | |
353 | |
354 Args: | |
355 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | |
356 | |
357 Returns: | |
358 Result[U, E]: The result of the mapper operation applied to the Result value. | |
359 """ | |
360 if self.isErr: return self | |
361 try: return Result.Ok(mapper(self.value)) | |
362 except Exception as e: return Result.Err(e) | |
363 | |
364 D = TypeVar("D", bound = "Result.ResultErr") | |
365 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | |
366 """ | |
367 Maps the error of the current Result to whatever is returned by the mapper function. | |
368 If the Result contained a successful operation it remains unchanged | |
369 (a reference to the current instance is returned). | |
370 If the mapper function panics this method does as well. | |
371 | |
372 Args: | |
373 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | |
374 | |
375 Returns: | |
376 Result[U, E]: The result of the mapper operation applied to the Result error. | |
377 """ | |
378 if self.isOk: return self | |
379 return Result.Err(mapper(self.value)) | |
380 | |
381 def __str__(self): | |
382 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | |
383 | |
384 # FILES | |
385 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | |
386 """ | |
387 Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | |
388 | |
389 Args: | |
390 path : the path to the dataset file. | |
391 datasetName : the name of the dataset. | |
392 | |
393 Raises: | |
394 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | |
395 it has less than 2 columns. | |
396 | |
397 Returns: | |
398 pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | |
399 """ | |
400 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | |
401 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | |
402 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | |
403 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | |
404 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | |
405 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | |
406 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | |
407 except: | |
408 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | |
409 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | |
410 | |
411 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | |
412 return dataset | |
413 | |
414 def readPickle(path :FilePath) -> Any: | |
415 """ | |
416 Reads the contents of a .pickle file, which needs to exist at the given path. | |
417 | |
418 Args: | |
419 path : the path to the .pickle file. | |
420 | |
421 Returns: | |
422 Any : the data inside a pickle file, could be anything. | |
423 """ | |
424 with open(path.show(), "rb") as fd: return pickle.load(fd) | |
425 | |
426 def writePickle(path :FilePath, data :Any) -> None: | |
427 """ | |
428 Saves any data in a .pickle file, created at the given path. | |
429 | |
430 Args: | |
431 path : the path to the .pickle file. | |
432 data : the data to be written to the file. | |
433 | |
434 Returns: | |
435 None | |
436 """ | |
437 with open(path.show(), "wb") as fd: pickle.dump(data, fd) | |
438 | |
439 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | |
440 """ | |
441 Reads the contents of a .csv file, which needs to exist at the given path. | |
442 | |
443 Args: | |
444 path : the path to the .csv file. | |
445 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | |
446 skipHeader : whether the first row of the file is a header and should be skipped. | |
447 | |
448 Returns: | |
449 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | |
450 """ | |
451 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | |
452 | |
453 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | |
454 """ | |
455 Reads the contents of a .svg file, which needs to exist at the given path. | |
456 | |
457 Args: | |
458 path : the path to the .svg file. | |
459 | |
460 Raises: | |
461 DataErr : if the map is malformed. | |
462 | |
463 Returns: | |
464 Any : the data inside a svg file, could be anything. | |
465 """ | |
466 try: return ET.parse(path.show()) | |
467 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | |
468 raise customErr if customErr else err | |
469 | |
470 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | |
471 """ | |
472 Saves svg data opened with lxml.etree in a .svg file, created at the given path. | |
473 | |
474 Args: | |
475 path : the path to the .svg file. | |
476 data : the data to be written to the file. | |
477 | |
478 Returns: | |
479 None | |
480 """ | |
481 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | |
482 | |
483 # UI ARGUMENTS | |
484 class Bool: | |
485 def __init__(self, argName :str) -> None: | |
486 self.argName = argName | |
487 | |
488 def __call__(self, s :str) -> bool: return self.check(s) | |
489 | |
490 def check(self, s :str) -> bool: | |
491 s = s.lower() | |
492 if s == "true" : return True | |
493 if s == "false": return False | |
494 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | |
495 | |
496 class Float: | |
497 def __init__(self, argName = "Dataset values, not an argument") -> None: | |
498 self.argName = argName | |
499 | |
500 def __call__(self, s :str) -> float: return self.check(s) | |
501 | |
502 def check(self, s :str) -> float: | |
503 try: return float(s) | |
504 except ValueError: | |
505 s = s.lower() | |
506 if s == "nan" or s == "none": return math.nan | |
507 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | |
508 | |
509 # MODELS | |
510 OldRule = List[Union[str, "OldRule"]] | |
511 class Model(Enum): | |
512 """ | |
513 Represents a metabolic model, either custom or locally supported. Custom models don't point | |
514 to valid file paths. | |
515 """ | |
516 | |
517 Recon = "Recon" | |
518 ENGRO2 = "ENGRO2" | |
519 ENGRO2_no_legend = "ENGRO2_no_legend" | |
520 HMRcore = "HMRcore" | |
521 HMRcore_no_legend = "HMRcore_no_legend" | |
522 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | |
523 | |
524 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | |
525 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | |
526 | |
527 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | |
528 """ | |
529 Open "rules" file for this model. | |
530 | |
531 Returns: | |
532 Dict[str, Dict[str, OldRule]] : the rules for this model. | |
533 """ | |
534 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
535 self.__raiseMissingPathErr(path) | |
536 return readPickle(path) | |
537 | |
538 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | |
539 """ | |
540 Open "gene translator (old: gene_in_rule)" file for this model. | |
541 | |
542 Returns: | |
543 Dict[str, Dict[str, str]] : the translator dict for this model. | |
544 """ | |
545 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
546 self.__raiseMissingPathErr(path) | |
547 return readPickle(path) | |
548 | |
549 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | |
550 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | |
551 self.__raiseMissingPathErr(path) | |
552 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | |
553 | |
554 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | |
555 if(self is Model.Custom): | |
556 return self.load_custom_model(customPath, customExtension) | |
557 else: | |
558 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | |
559 | |
560 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | |
561 ext = ext if ext else file_path.ext | |
562 try: | |
563 if ext is FileFormat.XML: | |
564 return cobra.io.read_sbml_model(file_path.show()) | |
565 | |
566 if ext is FileFormat.JSON: | |
567 return cobra.io.load_json_model(file_path.show()) | |
568 | |
569 except Exception as e: raise DataErr(file_path, e.__str__()) | |
570 raise DataErr(file_path, | |
571 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") | |
572 | |
573 def __str__(self) -> str: return self.value |