Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 240:63f5078627a9 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 13 Jan 2025 10:01:40 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
239:7bd33d296319 | 240:63f5078627a9 |
---|---|
1 import math | |
2 import re | |
3 import sys | |
4 import csv | |
5 import pickle | |
6 import lxml.etree as ET | |
7 | |
8 from enum import Enum | |
9 from itertools import count | |
10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union | |
11 | |
12 import pandas as pd | |
13 import cobra | |
14 | |
15 # FILES | |
16 class FileFormat(Enum): | |
17 """ | |
18 Encodes possible file extensions to conditionally save data in a different format. | |
19 """ | |
20 DAT = ("dat",) # this is how galaxy treats all your files! | |
21 CSV = ("csv",) # this is how most editable input data is written | |
22 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | |
23 | |
24 SVG = ("svg",) # this is how most metabolic maps are written | |
25 PNG = ("png",) # this is a common output format for images (such as metabolic maps) | |
26 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. | |
27 | |
28 XML = ("xml",) # this is one main way cobra models appear in | |
29 JSON = ("json",) # this is the other | |
30 | |
31 TXT = ("txt",) # this is how most output data is written | |
32 | |
33 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | |
34 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The | |
35 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it | |
36 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however | |
37 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare. | |
38 @classmethod | |
39 def fromExt(cls, ext :str) -> "FileFormat": | |
40 """ | |
41 Converts a file extension string to a FileFormat instance. | |
42 | |
43 Args: | |
44 ext : The file extension as a string. | |
45 | |
46 Returns: | |
47 FileFormat: The FileFormat instance corresponding to the file extension. | |
48 """ | |
49 variantName = ext.upper() | |
50 if variantName in FileFormat.__members__: return FileFormat[variantName] | |
51 | |
52 variantName = variantName.lower() | |
53 for member in cls: | |
54 if variantName in member.value: return member | |
55 | |
56 raise ValueErr("ext", "a valid FileFormat file extension", ext) | |
57 | |
58 def __str__(self) -> str: | |
59 """ | |
60 (Private) converts to str representation. Good practice for usage with argparse. | |
61 | |
62 Returns: | |
63 str : the string representation of the file extension. | |
64 """ | |
65 return self.value[-1] #TODO: fix, it's the dumb pickle thing | |
66 | |
67 class FilePath(): | |
68 """ | |
69 Represents a file path. View this as an attempt to standardize file-related operations by expecting | |
70 values of this type in any process requesting a file path. | |
71 """ | |
72 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None: | |
73 """ | |
74 (Private) Initializes an instance of FilePath. | |
75 | |
76 Args: | |
77 path : the end of the path, containing the file name. | |
78 ext : the file's extension. | |
79 prefix : anything before path, if the last '/' isn't there it's added by the code. | |
80 | |
81 Returns: | |
82 None : practically, a FilePath instance. | |
83 """ | |
84 self.ext = ext | |
85 self.filePath = filePath | |
86 | |
87 if prefix and prefix[-1] != '/': prefix += '/' | |
88 self.prefix = prefix | |
89 | |
90 @classmethod | |
91 def fromStrPath(cls, path :str) -> "FilePath": | |
92 """ | |
93 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | |
94 | |
95 Args: | |
96 path : the string containing the path | |
97 | |
98 Raises: | |
99 PathErr : if the provided string doesn't represent a valid path. | |
100 | |
101 Returns: | |
102 FilePath : the constructed instance. | |
103 """ | |
104 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should* | |
105 # always be correct paths and could be used as raw strings, however most if not all functions that work with | |
106 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this. | |
107 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only | |
108 # to call show() immediately to bring back the string and open the file it points to. | |
109 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES | |
110 # their correctness when modifying the UI and avoids the pointless back-and-forth. | |
111 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | |
112 if not result or not result["name"] or not result["ext"]: | |
113 raise PathErr(path, "cannot recognize folder structure or extension in path") | |
114 | |
115 prefix = result["prefix"] if result["prefix"] else "" | |
116 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix) | |
117 | |
118 def show(self) -> str: | |
119 """ | |
120 Shows the path as a string. | |
121 | |
122 Returns: | |
123 str : the path shown as a string. | |
124 """ | |
125 return f"{self.prefix}{self.filePath}.{self.ext}" | |
126 | |
127 def __str__(self) -> str: return self.show() | |
128 | |
129 # ERRORS | |
130 def terminate(msg :str) -> None: | |
131 """ | |
132 Terminate the execution of the script with an error message. | |
133 | |
134 Args: | |
135 msg (str): The error message to be displayed. | |
136 | |
137 Returns: | |
138 None | |
139 """ | |
140 sys.exit(f"Execution aborted: {msg}\n") | |
141 | |
142 def logWarning(msg :str, loggerPath :str) -> None: | |
143 """ | |
144 Log a warning message to an output log file and print it to the console. The final period and a | |
145 newline is added by the function. | |
146 | |
147 Args: | |
148 s (str): The warning message to be logged and printed. | |
149 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | |
150 immediately read back (beware relative expensive operation, log with caution). | |
151 | |
152 Returns: | |
153 None | |
154 """ | |
155 # building the path and then reading it immediately seems useless, but it's actually a way of | |
156 # validating that reduces repetition on the caller's side. Besides, logging a message by writing | |
157 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | |
158 # mindlessly logging whenever something comes up, log at the very end and tell the user everything | |
159 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | |
160 # the file only at the end of the program's execution. | |
161 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | |
162 | |
163 class CustomErr(Exception): | |
164 """ | |
165 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | |
166 """ | |
167 __idGenerator = count() | |
168 errName = "Custom Error" | |
169 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | |
170 """ | |
171 (Private) Initializes an instance of CustomErr. | |
172 | |
173 Args: | |
174 msg (str): Error message to be displayed. | |
175 details (str): Informs the user more about the error encountered. Defaults to "". | |
176 explicitErrCode (int): Explicit error code to be used. Defaults to -1. | |
177 | |
178 Returns: | |
179 None : practically, a CustomErr instance. | |
180 """ | |
181 self.msg = msg | |
182 self.details = details | |
183 | |
184 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | |
185 | |
186 def throw(self, loggerPath = "") -> None: | |
187 """ | |
188 Raises the current CustomErr instance, logging a warning message before doing so. | |
189 | |
190 Raises: | |
191 self: The current CustomErr instance. | |
192 | |
193 Returns: | |
194 None | |
195 """ | |
196 if loggerPath: logWarning(str(self), loggerPath) | |
197 raise self | |
198 | |
199 def abort(self) -> None: | |
200 """ | |
201 Aborts the execution of the script. | |
202 | |
203 Returns: | |
204 None | |
205 """ | |
206 terminate(str(self)) | |
207 | |
208 def __str__(self) -> str: | |
209 """ | |
210 (Private) Returns a string representing the current CustomErr instance. | |
211 | |
212 Returns: | |
213 str: A string representing the current CustomErr instance. | |
214 """ | |
215 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | |
216 | |
217 class ArgsErr(CustomErr): | |
218 """ | |
219 CustomErr subclass for UI arguments errors. | |
220 """ | |
221 errName = "Args Error" | |
222 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
223 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | |
224 | |
225 class DataErr(CustomErr): | |
226 """ | |
227 CustomErr subclass for data formatting errors. | |
228 """ | |
229 errName = "Data Format Error" | |
230 def __init__(self, fileName :str, msg = "no further details provided") -> None: | |
231 super().__init__(f"file \"{fileName}\" contains malformed data", msg) | |
232 | |
233 class PathErr(CustomErr): | |
234 """ | |
235 CustomErr subclass for filepath formatting errors. | |
236 """ | |
237 errName = "Path Error" | |
238 def __init__(self, path :FilePath, msg = "no further details provided") -> None: | |
239 super().__init__(f"path \"{path}\" is invalid", msg) | |
240 | |
241 class ValueErr(CustomErr): | |
242 """ | |
243 CustomErr subclass for any value error. | |
244 """ | |
245 errName = "Value Error" | |
246 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
247 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | |
248 | |
249 # RESULT | |
250 T = TypeVar('T') | |
251 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | |
252 class Result(Generic[T, E]): | |
253 class ResultErr(CustomErr): | |
254 """ | |
255 CustomErr subclass for all Result errors. | |
256 """ | |
257 errName = "Result Error" | |
258 def __init__(self, msg = "no further details provided") -> None: | |
259 super().__init__(msg) | |
260 """ | |
261 Class to handle the result of an operation, with a value and a boolean flag to indicate | |
262 whether the operation was successful or not. | |
263 """ | |
264 def __init__(self, value :Union[T, E], isOk :bool) -> None: | |
265 """ | |
266 (Private) Initializes an instance of Result. | |
267 | |
268 Args: | |
269 value (Union[T, E]): The value to be stored in the Result instance. | |
270 isOk (bool): A boolean flag to indicate whether the operation was successful or not. | |
271 | |
272 Returns: | |
273 None : practically, a Result instance. | |
274 """ | |
275 self.isOk = isOk | |
276 self.isErr = not isOk | |
277 self.value = value | |
278 | |
279 @classmethod | |
280 def Ok(cls, value :T) -> "Result": | |
281 """ | |
282 Constructs a new Result instance with a successful operation. | |
283 | |
284 Args: | |
285 value (T): The value to be stored in the Result instance, set as successful. | |
286 | |
287 Returns: | |
288 Result: A new Result instance with a successful operation. | |
289 """ | |
290 return Result(value, isOk = True) | |
291 | |
292 @classmethod | |
293 def Err(cls, value :E) -> "Result": | |
294 """ | |
295 Constructs a new Result instance with a failed operation. | |
296 | |
297 Args: | |
298 value (E): The value to be stored in the Result instance, set as failed. | |
299 | |
300 Returns: | |
301 Result: A new Result instance with a failed operation. | |
302 """ | |
303 return Result(value, isOk = False) | |
304 | |
305 def unwrap(self) -> T: | |
306 """ | |
307 Unwraps the value of the Result instance, if the operation was successful. | |
308 | |
309 Raises: | |
310 ResultErr: If the operation was not successful. | |
311 | |
312 Returns: | |
313 T: The value of the Result instance, if the operation was successful. | |
314 """ | |
315 if self.isOk: return self.value | |
316 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | |
317 | |
318 def unwrapOr(self, default :T) -> T: | |
319 """ | |
320 Unwraps the value of the Result instance, if the operation was successful, otherwise | |
321 it returns a default value. | |
322 | |
323 Args: | |
324 default (T): The default value to be returned if the operation was not successful. | |
325 | |
326 Returns: | |
327 T: The value of the Result instance, if the operation was successful, | |
328 otherwise the default value. | |
329 """ | |
330 return self.value if self.isOk else default | |
331 | |
332 def expect(self, err :"Result.ResultErr") -> T: | |
333 """ | |
334 Expects that the value of the Result instance is successful, otherwise it raises an error. | |
335 | |
336 Args: | |
337 err (Exception): The error to be raised if the operation was not successful. | |
338 | |
339 Raises: | |
340 err: The error raised if the operation was not successful. | |
341 | |
342 Returns: | |
343 T: The value of the Result instance, if the operation was successful. | |
344 """ | |
345 if self.isOk: return self.value | |
346 raise err | |
347 | |
348 U = TypeVar("U") | |
349 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | |
350 """ | |
351 Maps the value of the current Result to whatever is returned by the mapper function. | |
352 If the Result contained an unsuccessful operation to begin with it remains unchanged | |
353 (a reference to the current instance is returned). | |
354 If the mapper function panics the returned result instance will be of the error kind. | |
355 | |
356 Args: | |
357 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | |
358 | |
359 Returns: | |
360 Result[U, E]: The result of the mapper operation applied to the Result value. | |
361 """ | |
362 if self.isErr: return self | |
363 try: return Result.Ok(mapper(self.value)) | |
364 except Exception as e: return Result.Err(e) | |
365 | |
366 D = TypeVar("D", bound = "Result.ResultErr") | |
367 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | |
368 """ | |
369 Maps the error of the current Result to whatever is returned by the mapper function. | |
370 If the Result contained a successful operation it remains unchanged | |
371 (a reference to the current instance is returned). | |
372 If the mapper function panics this method does as well. | |
373 | |
374 Args: | |
375 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | |
376 | |
377 Returns: | |
378 Result[U, E]: The result of the mapper operation applied to the Result error. | |
379 """ | |
380 if self.isOk: return self | |
381 return Result.Err(mapper(self.value)) | |
382 | |
383 def __str__(self): | |
384 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | |
385 | |
386 # FILES | |
387 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | |
388 """ | |
389 Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | |
390 | |
391 Args: | |
392 path : the path to the dataset file. | |
393 datasetName : the name of the dataset. | |
394 | |
395 Raises: | |
396 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | |
397 it has less than 2 columns. | |
398 | |
399 Returns: | |
400 pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | |
401 """ | |
402 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | |
403 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | |
404 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | |
405 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | |
406 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | |
407 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | |
408 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | |
409 except: | |
410 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | |
411 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | |
412 | |
413 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | |
414 return dataset | |
415 | |
416 def readPickle(path :FilePath) -> Any: | |
417 """ | |
418 Reads the contents of a .pickle file, which needs to exist at the given path. | |
419 | |
420 Args: | |
421 path : the path to the .pickle file. | |
422 | |
423 Returns: | |
424 Any : the data inside a pickle file, could be anything. | |
425 """ | |
426 with open(path.show(), "rb") as fd: return pickle.load(fd) | |
427 | |
428 def writePickle(path :FilePath, data :Any) -> None: | |
429 """ | |
430 Saves any data in a .pickle file, created at the given path. | |
431 | |
432 Args: | |
433 path : the path to the .pickle file. | |
434 data : the data to be written to the file. | |
435 | |
436 Returns: | |
437 None | |
438 """ | |
439 with open(path.show(), "wb") as fd: pickle.dump(data, fd) | |
440 | |
441 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | |
442 """ | |
443 Reads the contents of a .csv file, which needs to exist at the given path. | |
444 | |
445 Args: | |
446 path : the path to the .csv file. | |
447 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | |
448 skipHeader : whether the first row of the file is a header and should be skipped. | |
449 | |
450 Returns: | |
451 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | |
452 """ | |
453 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | |
454 | |
455 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | |
456 """ | |
457 Reads the contents of a .svg file, which needs to exist at the given path. | |
458 | |
459 Args: | |
460 path : the path to the .svg file. | |
461 | |
462 Raises: | |
463 DataErr : if the map is malformed. | |
464 | |
465 Returns: | |
466 Any : the data inside a svg file, could be anything. | |
467 """ | |
468 try: return ET.parse(path.show()) | |
469 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | |
470 raise customErr if customErr else err | |
471 | |
472 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | |
473 """ | |
474 Saves svg data opened with lxml.etree in a .svg file, created at the given path. | |
475 | |
476 Args: | |
477 path : the path to the .svg file. | |
478 data : the data to be written to the file. | |
479 | |
480 Returns: | |
481 None | |
482 """ | |
483 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | |
484 | |
485 # UI ARGUMENTS | |
486 class Bool: | |
487 def __init__(self, argName :str) -> None: | |
488 self.argName = argName | |
489 | |
490 def __call__(self, s :str) -> bool: return self.check(s) | |
491 | |
492 def check(self, s :str) -> bool: | |
493 s = s.lower() | |
494 if s == "true" : return True | |
495 if s == "false": return False | |
496 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | |
497 | |
498 class Float: | |
499 def __init__(self, argName = "Dataset values, not an argument") -> None: | |
500 self.argName = argName | |
501 | |
502 def __call__(self, s :str) -> float: return self.check(s) | |
503 | |
504 def check(self, s :str) -> float: | |
505 try: return float(s) | |
506 except ValueError: | |
507 s = s.lower() | |
508 if s == "nan" or s == "none": return math.nan | |
509 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | |
510 | |
511 # MODELS | |
512 OldRule = List[Union[str, "OldRule"]] | |
513 class Model(Enum): | |
514 """ | |
515 Represents a metabolic model, either custom or locally supported. Custom models don't point | |
516 to valid file paths. | |
517 """ | |
518 | |
519 Recon = "Recon" | |
520 ENGRO2 = "ENGRO2" | |
521 ENGRO2_no_legend = "ENGRO2_no_legend" | |
522 HMRcore = "HMRcore" | |
523 HMRcore_no_legend = "HMRcore_no_legend" | |
524 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | |
525 | |
526 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | |
527 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | |
528 | |
529 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | |
530 """ | |
531 Open "rules" file for this model. | |
532 | |
533 Returns: | |
534 Dict[str, Dict[str, OldRule]] : the rules for this model. | |
535 """ | |
536 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
537 self.__raiseMissingPathErr(path) | |
538 return readPickle(path) | |
539 | |
540 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | |
541 """ | |
542 Open "gene translator (old: gene_in_rule)" file for this model. | |
543 | |
544 Returns: | |
545 Dict[str, Dict[str, str]] : the translator dict for this model. | |
546 """ | |
547 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
548 self.__raiseMissingPathErr(path) | |
549 return readPickle(path) | |
550 | |
551 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | |
552 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | |
553 self.__raiseMissingPathErr(path) | |
554 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | |
555 | |
556 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | |
557 if(self is Model.Custom): | |
558 return self.load_custom_model(customPath, customExtension) | |
559 else: | |
560 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | |
561 | |
562 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | |
563 ext = ext if ext else file_path.ext | |
564 try: | |
565 if ext is FileFormat.XML: | |
566 return cobra.io.read_sbml_model(file_path.show()) | |
567 | |
568 if ext is FileFormat.JSON: | |
569 return cobra.io.load_json_model(file_path.show()) | |
570 | |
571 except Exception as e: raise DataErr(file_path, e.__str__()) | |
572 raise DataErr(file_path, | |
573 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") | |
574 | |
575 def __str__(self) -> str: return self.value |