335
|
1 import math
|
|
2 import re
|
|
3 import sys
|
|
4 import csv
|
|
5 import pickle
|
|
6 import lxml.etree as ET
|
|
7
|
|
8 from enum import Enum
|
|
9 from itertools import count
|
|
10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union
|
|
11
|
|
12 import pandas as pd
|
|
13 import cobra
|
|
14
|
|
15 import zipfile
|
|
16 import gzip
|
|
17 import bz2
|
|
18 from io import StringIO
|
|
19
|
|
20 # FILES
|
|
21 class FileFormat(Enum):
|
|
22 """
|
|
23 Encodes possible file extensions to conditionally save data in a different format.
|
|
24 """
|
|
25 DAT = ("dat",) # this is how galaxy treats all your files!
|
|
26 CSV = ("csv",) # this is how most editable input data is written
|
|
27 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written
|
|
28
|
|
29 SVG = ("svg",) # this is how most metabolic maps are written
|
|
30 PNG = ("png",) # this is a common output format for images (such as metabolic maps)
|
|
31 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
|
|
32
|
|
33 XML = ("xml","xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
|
|
34 JSON = ("json","json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
|
|
35
|
|
36 TXT = ("txt",) # this is how most output data is written
|
|
37
|
|
38 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
|
|
39
|
|
40 def __init__(self):
|
|
41 self.original_extension = ""
|
|
42
|
|
43
|
|
44 @classmethod
|
|
45 def fromExt(cls, ext :str) -> "FileFormat":
|
|
46 """
|
|
47 Converts a file extension string to a FileFormat instance.
|
|
48
|
|
49 Args:
|
|
50 ext : The file extension as a string.
|
|
51
|
|
52 Returns:
|
|
53 FileFormat: The FileFormat instance corresponding to the file extension.
|
|
54 """
|
|
55 variantName = ext.upper()
|
|
56 if variantName in FileFormat.__members__:
|
|
57 instance = FileFormat[variantName]
|
|
58 instance.original_extension = ext
|
|
59 return instance
|
|
60
|
|
61 variantName = variantName.lower()
|
|
62 for member in cls:
|
|
63 if variantName in member.value:
|
|
64 member.original_extension = ext
|
|
65 return member
|
|
66
|
|
67 raise ValueErr("ext", "a valid FileFormat file extension", ext)
|
|
68
|
|
69 def __str__(self) -> str:
|
|
70 """
|
|
71 (Private) converts to str representation. Good practice for usage with argparse.
|
|
72
|
|
73 Returns:
|
|
74 str : the string representation of the file extension.
|
|
75 """
|
|
76
|
|
77 if(self.values[-1] in ["json", "xml"]): #return the original string extension for compressed files
|
|
78 return self.original_extension
|
|
79 else:
|
|
80 return self.value[-1] # for all other formats and pickle
|
|
81
|
|
82 class FilePath():
|
|
83 """
|
|
84 Represents a file path. View this as an attempt to standardize file-related operations by expecting
|
|
85 values of this type in any process requesting a file path.
|
|
86 """
|
|
87 def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None:
|
|
88 """
|
|
89 (Private) Initializes an instance of FilePath.
|
|
90
|
|
91 Args:
|
|
92 path : the end of the path, containing the file name.
|
|
93 ext : the file's extension.
|
|
94 prefix : anything before path, if the last '/' isn't there it's added by the code.
|
|
95
|
|
96 Returns:
|
|
97 None : practically, a FilePath instance.
|
|
98 """
|
|
99 self.ext = ext
|
|
100 self.filePath = filePath
|
|
101
|
|
102 if prefix and prefix[-1] != '/': prefix += '/'
|
|
103 self.prefix = prefix
|
|
104
|
|
105 @classmethod
|
|
106 def fromStrPath(cls, path :str) -> "FilePath":
|
|
107 """
|
|
108 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
|
|
109 It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
|
|
110 These double extensions are not supported for other file types such as .csv.
|
|
111
|
|
112 Args:
|
|
113 path : the string containing the path
|
|
114
|
|
115 Raises:
|
|
116 PathErr : if the provided string doesn't represent a valid path.
|
|
117
|
|
118 Returns:
|
|
119 FilePath : the constructed instance.
|
|
120 """
|
|
121 # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should*
|
|
122 # always be correct paths and could be used as raw strings, however most if not all functions that work with
|
|
123 # file paths request the FilePath objects specifically, which is a very good thing in any case other than this.
|
|
124 # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only
|
|
125 # to call show() immediately to bring back the string and open the file it points to.
|
|
126 # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES
|
|
127 # their correctness when modifying the UI and avoids the pointless back-and-forth.
|
|
128 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
|
|
129 if not result or not result["name"] or not result["ext"]:
|
|
130 raise PathErr(path, "cannot recognize folder structure or extension in path")
|
|
131
|
|
132 prefix = result["prefix"] if result["prefix"] else ""
|
|
133 name, ext = result["name"], result["ext"]
|
|
134
|
|
135 # Split path into parts
|
|
136 parts = path.split(".")
|
|
137 if len(parts) >= 3:
|
|
138 penultimate = parts[-2]
|
|
139 last = parts[-1]
|
|
140 if penultimate in {"json", "xml"}:
|
|
141 name = ".".join(parts[:-2])
|
|
142 ext = f"{penultimate}.{last}"
|
|
143
|
|
144 return cls(name, FileFormat.fromExt(ext), prefix=prefix)
|
|
145
|
|
146 def show(self) -> str:
|
|
147 """
|
|
148 Shows the path as a string.
|
|
149
|
|
150 Returns:
|
|
151 str : the path shown as a string.
|
|
152 """
|
|
153 return f"{self.prefix}{self.filePath}.{self.ext}"
|
|
154
|
|
155 def __str__(self) -> str: return self.show()
|
|
156
|
|
157 # ERRORS
|
|
158 def terminate(msg :str) -> None:
|
|
159 """
|
|
160 Terminate the execution of the script with an error message.
|
|
161
|
|
162 Args:
|
|
163 msg (str): The error message to be displayed.
|
|
164
|
|
165 Returns:
|
|
166 None
|
|
167 """
|
|
168 sys.exit(f"Execution aborted: {msg}\n")
|
|
169
|
|
170 def logWarning(msg :str, loggerPath :str) -> None:
|
|
171 """
|
|
172 Log a warning message to an output log file and print it to the console. The final period and a
|
|
173 newline is added by the function.
|
|
174
|
|
175 Args:
|
|
176 s (str): The warning message to be logged and printed.
|
|
177 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
|
|
178 immediately read back (beware relative expensive operation, log with caution).
|
|
179
|
|
180 Returns:
|
|
181 None
|
|
182 """
|
|
183 # building the path and then reading it immediately seems useless, but it's actually a way of
|
|
184 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
|
|
185 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
|
|
186 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
|
|
187 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
|
|
188 # the file only at the end of the program's execution.
|
|
189 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
|
|
190
|
|
191 class CustomErr(Exception):
|
|
192 """
|
|
193 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
|
|
194 """
|
|
195 __idGenerator = count()
|
|
196 errName = "Custom Error"
|
|
197 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
|
|
198 """
|
|
199 (Private) Initializes an instance of CustomErr.
|
|
200
|
|
201 Args:
|
|
202 msg (str): Error message to be displayed.
|
|
203 details (str): Informs the user more about the error encountered. Defaults to "".
|
|
204 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
|
|
205
|
|
206 Returns:
|
|
207 None : practically, a CustomErr instance.
|
|
208 """
|
|
209 self.msg = msg
|
|
210 self.details = details
|
|
211
|
|
212 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
|
|
213
|
|
214 def throw(self, loggerPath = "") -> None:
|
|
215 """
|
|
216 Raises the current CustomErr instance, logging a warning message before doing so.
|
|
217
|
|
218 Raises:
|
|
219 self: The current CustomErr instance.
|
|
220
|
|
221 Returns:
|
|
222 None
|
|
223 """
|
|
224 if loggerPath: logWarning(str(self), loggerPath)
|
|
225 raise self
|
|
226
|
|
227 def abort(self) -> None:
|
|
228 """
|
|
229 Aborts the execution of the script.
|
|
230
|
|
231 Returns:
|
|
232 None
|
|
233 """
|
|
234 terminate(str(self))
|
|
235
|
|
236 def __str__(self) -> str:
|
|
237 """
|
|
238 (Private) Returns a string representing the current CustomErr instance.
|
|
239
|
|
240 Returns:
|
|
241 str: A string representing the current CustomErr instance.
|
|
242 """
|
|
243 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
|
|
244
|
|
245 class ArgsErr(CustomErr):
|
|
246 """
|
|
247 CustomErr subclass for UI arguments errors.
|
|
248 """
|
|
249 errName = "Args Error"
|
|
250 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
|
|
251 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
|
|
252
|
|
253 class DataErr(CustomErr):
|
|
254 """
|
|
255 CustomErr subclass for data formatting errors.
|
|
256 """
|
|
257 errName = "Data Format Error"
|
|
258 def __init__(self, fileName :str, msg = "no further details provided") -> None:
|
|
259 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
|
|
260
|
|
261 class PathErr(CustomErr):
|
|
262 """
|
|
263 CustomErr subclass for filepath formatting errors.
|
|
264 """
|
|
265 errName = "Path Error"
|
|
266 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
|
|
267 super().__init__(f"path \"{path}\" is invalid", msg)
|
|
268
|
|
269 class ValueErr(CustomErr):
|
|
270 """
|
|
271 CustomErr subclass for any value error.
|
|
272 """
|
|
273 errName = "Value Error"
|
|
274 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
|
|
275 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
|
|
276
|
|
277 # RESULT
|
|
278 T = TypeVar('T')
|
|
279 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
|
|
280 class Result(Generic[T, E]):
|
|
281 class ResultErr(CustomErr):
|
|
282 """
|
|
283 CustomErr subclass for all Result errors.
|
|
284 """
|
|
285 errName = "Result Error"
|
|
286 def __init__(self, msg = "no further details provided") -> None:
|
|
287 super().__init__(msg)
|
|
288 """
|
|
289 Class to handle the result of an operation, with a value and a boolean flag to indicate
|
|
290 whether the operation was successful or not.
|
|
291 """
|
|
292 def __init__(self, value :Union[T, E], isOk :bool) -> None:
|
|
293 """
|
|
294 (Private) Initializes an instance of Result.
|
|
295
|
|
296 Args:
|
|
297 value (Union[T, E]): The value to be stored in the Result instance.
|
|
298 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
|
|
299
|
|
300 Returns:
|
|
301 None : practically, a Result instance.
|
|
302 """
|
|
303 self.isOk = isOk
|
|
304 self.isErr = not isOk
|
|
305 self.value = value
|
|
306
|
|
307 @classmethod
|
|
308 def Ok(cls, value :T) -> "Result":
|
|
309 """
|
|
310 Constructs a new Result instance with a successful operation.
|
|
311
|
|
312 Args:
|
|
313 value (T): The value to be stored in the Result instance, set as successful.
|
|
314
|
|
315 Returns:
|
|
316 Result: A new Result instance with a successful operation.
|
|
317 """
|
|
318 return Result(value, isOk = True)
|
|
319
|
|
320 @classmethod
|
|
321 def Err(cls, value :E) -> "Result":
|
|
322 """
|
|
323 Constructs a new Result instance with a failed operation.
|
|
324
|
|
325 Args:
|
|
326 value (E): The value to be stored in the Result instance, set as failed.
|
|
327
|
|
328 Returns:
|
|
329 Result: A new Result instance with a failed operation.
|
|
330 """
|
|
331 return Result(value, isOk = False)
|
|
332
|
|
333 def unwrap(self) -> T:
|
|
334 """
|
|
335 Unwraps the value of the Result instance, if the operation was successful.
|
|
336
|
|
337 Raises:
|
|
338 ResultErr: If the operation was not successful.
|
|
339
|
|
340 Returns:
|
|
341 T: The value of the Result instance, if the operation was successful.
|
|
342 """
|
|
343 if self.isOk: return self.value
|
|
344 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
|
|
345
|
|
346 def unwrapOr(self, default :T) -> T:
|
|
347 """
|
|
348 Unwraps the value of the Result instance, if the operation was successful, otherwise
|
|
349 it returns a default value.
|
|
350
|
|
351 Args:
|
|
352 default (T): The default value to be returned if the operation was not successful.
|
|
353
|
|
354 Returns:
|
|
355 T: The value of the Result instance, if the operation was successful,
|
|
356 otherwise the default value.
|
|
357 """
|
|
358 return self.value if self.isOk else default
|
|
359
|
|
360 def expect(self, err :"Result.ResultErr") -> T:
|
|
361 """
|
|
362 Expects that the value of the Result instance is successful, otherwise it raises an error.
|
|
363
|
|
364 Args:
|
|
365 err (Exception): The error to be raised if the operation was not successful.
|
|
366
|
|
367 Raises:
|
|
368 err: The error raised if the operation was not successful.
|
|
369
|
|
370 Returns:
|
|
371 T: The value of the Result instance, if the operation was successful.
|
|
372 """
|
|
373 if self.isOk: return self.value
|
|
374 raise err
|
|
375
|
|
376 U = TypeVar("U")
|
|
377 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
|
|
378 """
|
|
379 Maps the value of the current Result to whatever is returned by the mapper function.
|
|
380 If the Result contained an unsuccessful operation to begin with it remains unchanged
|
|
381 (a reference to the current instance is returned).
|
|
382 If the mapper function panics the returned result instance will be of the error kind.
|
|
383
|
|
384 Args:
|
|
385 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
|
|
386
|
|
387 Returns:
|
|
388 Result[U, E]: The result of the mapper operation applied to the Result value.
|
|
389 """
|
|
390 if self.isErr: return self
|
|
391 try: return Result.Ok(mapper(self.value))
|
|
392 except Exception as e: return Result.Err(e)
|
|
393
|
|
394 D = TypeVar("D", bound = "Result.ResultErr")
|
|
395 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
|
|
396 """
|
|
397 Maps the error of the current Result to whatever is returned by the mapper function.
|
|
398 If the Result contained a successful operation it remains unchanged
|
|
399 (a reference to the current instance is returned).
|
|
400 If the mapper function panics this method does as well.
|
|
401
|
|
402 Args:
|
|
403 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
|
|
404
|
|
405 Returns:
|
|
406 Result[U, E]: The result of the mapper operation applied to the Result error.
|
|
407 """
|
|
408 if self.isOk: return self
|
|
409 return Result.Err(mapper(self.value))
|
|
410
|
|
411 def __str__(self):
|
|
412 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
|
|
413
|
|
414 # FILES
|
|
415 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
|
|
416 """
|
|
417 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
|
|
418
|
|
419 Args:
|
|
420 path : the path to the dataset file.
|
|
421 datasetName : the name of the dataset.
|
|
422
|
|
423 Raises:
|
|
424 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
|
|
425 it has less than 2 columns.
|
|
426
|
|
427 Returns:
|
|
428 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
|
|
429 """
|
|
430 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
|
|
431 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
|
|
432 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
|
|
433 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
|
|
434 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
|
|
435 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
|
|
436 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
|
|
437 except:
|
|
438 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
|
|
439 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
|
|
440
|
|
441 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
|
|
442 return dataset
|
|
443
|
|
444 def readPickle(path :FilePath) -> Any:
|
|
445 """
|
|
446 Reads the contents of a .pickle file, which needs to exist at the given path.
|
|
447
|
|
448 Args:
|
|
449 path : the path to the .pickle file.
|
|
450
|
|
451 Returns:
|
|
452 Any : the data inside a pickle file, could be anything.
|
|
453 """
|
|
454 with open(path.show(), "rb") as fd: return pickle.load(fd)
|
|
455
|
|
456 def writePickle(path :FilePath, data :Any) -> None:
|
|
457 """
|
|
458 Saves any data in a .pickle file, created at the given path.
|
|
459
|
|
460 Args:
|
|
461 path : the path to the .pickle file.
|
|
462 data : the data to be written to the file.
|
|
463
|
|
464 Returns:
|
|
465 None
|
|
466 """
|
|
467 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
|
|
468
|
|
469 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
|
|
470 """
|
|
471 Reads the contents of a .csv file, which needs to exist at the given path.
|
|
472
|
|
473 Args:
|
|
474 path : the path to the .csv file.
|
|
475 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
|
|
476 skipHeader : whether the first row of the file is a header and should be skipped.
|
|
477
|
|
478 Returns:
|
|
479 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
|
|
480 """
|
|
481 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
|
|
482
|
|
483 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
|
|
484 """
|
|
485 Reads the contents of a .svg file, which needs to exist at the given path.
|
|
486
|
|
487 Args:
|
|
488 path : the path to the .svg file.
|
|
489
|
|
490 Raises:
|
|
491 DataErr : if the map is malformed.
|
|
492
|
|
493 Returns:
|
|
494 Any : the data inside a svg file, could be anything.
|
|
495 """
|
|
496 try: return ET.parse(path.show())
|
|
497 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
|
|
498 raise customErr if customErr else err
|
|
499
|
|
500 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
|
|
501 """
|
|
502 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
|
|
503
|
|
504 Args:
|
|
505 path : the path to the .svg file.
|
|
506 data : the data to be written to the file.
|
|
507
|
|
508 Returns:
|
|
509 None
|
|
510 """
|
|
511 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
|
|
512
|
|
513 # UI ARGUMENTS
|
|
514 class Bool:
|
|
515 def __init__(self, argName :str) -> None:
|
|
516 self.argName = argName
|
|
517
|
|
518 def __call__(self, s :str) -> bool: return self.check(s)
|
|
519
|
|
520 def check(self, s :str) -> bool:
|
|
521 s = s.lower()
|
|
522 if s == "true" : return True
|
|
523 if s == "false": return False
|
|
524 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
|
|
525
|
|
526 class Float:
|
|
527 def __init__(self, argName = "Dataset values, not an argument") -> None:
|
|
528 self.argName = argName
|
|
529
|
|
530 def __call__(self, s :str) -> float: return self.check(s)
|
|
531
|
|
532 def check(self, s :str) -> float:
|
|
533 try: return float(s)
|
|
534 except ValueError:
|
|
535 s = s.lower()
|
|
536 if s == "nan" or s == "none": return math.nan
|
|
537 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
|
|
538
|
|
539 # MODELS
|
|
540 OldRule = List[Union[str, "OldRule"]]
|
|
541 class Model(Enum):
|
|
542 """
|
|
543 Represents a metabolic model, either custom or locally supported. Custom models don't point
|
|
544 to valid file paths.
|
|
545 """
|
|
546
|
|
547 Recon = "Recon"
|
|
548 ENGRO2 = "ENGRO2"
|
|
549 ENGRO2_no_legend = "ENGRO2_no_legend"
|
|
550 HMRcore = "HMRcore"
|
|
551 HMRcore_no_legend = "HMRcore_no_legend"
|
|
552 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
|
|
553
|
|
554 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
|
|
555 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
|
|
556
|
|
557 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
|
|
558 """
|
|
559 Open "rules" file for this model.
|
|
560
|
|
561 Returns:
|
|
562 Dict[str, Dict[str, OldRule]] : the rules for this model.
|
|
563 """
|
|
564 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
|
|
565 self.__raiseMissingPathErr(path)
|
|
566 return readPickle(path)
|
|
567
|
|
568 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
|
|
569 """
|
|
570 Open "gene translator (old: gene_in_rule)" file for this model.
|
|
571
|
|
572 Returns:
|
|
573 Dict[str, Dict[str, str]] : the translator dict for this model.
|
|
574 """
|
|
575 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
|
|
576 self.__raiseMissingPathErr(path)
|
|
577 return readPickle(path)
|
|
578
|
|
579 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
|
|
580 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
|
|
581 self.__raiseMissingPathErr(path)
|
|
582 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
|
|
583
|
|
584 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
|
|
585 if(self is Model.Custom):
|
|
586 return self.load_custom_model(customPath, customExtension)
|
|
587 else:
|
|
588 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
|
|
589
|
|
590 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
|
|
591 ext = ext if ext else file_path.ext
|
|
592 try:
|
|
593 if ext in FileFormat.XML:
|
|
594 return cobra.io.read_sbml_model(file_path.show())
|
|
595
|
|
596 if ext in FileFormat.JSON:
|
|
597 # Compressed files are not automatically handled by cobra
|
|
598 if(ext == "json"):
|
|
599 return cobra.io.load_json_model(file_path.show())
|
|
600 else:
|
|
601 return self.extract_json_model(file_path, ext)
|
|
602
|
|
603 except Exception as e: raise DataErr(file_path, e.__str__())
|
|
604 raise DataErr(file_path,
|
|
605 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")
|
|
606
|
|
607
|
|
608 def extract_json_model(file_path:FilePath, ext :FileFormat) -> cobra.Model:
|
|
609 """
|
|
610 Extract json COBRA model from a compressed file (zip, gz, bz2).
|
|
611
|
|
612 Args:
|
|
613 file_path: File path of the model
|
|
614 ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
|
|
615
|
|
616 Returns:
|
|
617 cobra.Model: COBRApy model
|
|
618
|
|
619 Raises:
|
|
620 Exception: Extraction errors
|
|
621 """
|
|
622 ext_str = str(ext)
|
|
623
|
|
624 try:
|
|
625 if '.zip' in ext_str:
|
|
626 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
|
|
627 with zip_ref.open(zip_ref.namelist()[0]) as json_file:
|
|
628 content = json_file.read().decode('utf-8')
|
|
629 return cobra.io.load_json_model(StringIO(content))
|
|
630 elif '.gz' in ext_str:
|
|
631 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
|
|
632 return cobra.io.load_json_model(gz_ref)
|
|
633 elif '.bz2' in ext_str:
|
|
634 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
|
|
635 return cobra.io.load_json_model(bz2_ref)
|
|
636 else:
|
|
637 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
|
|
638
|
|
639 except Exception as e:
|
|
640 raise Exception(f"Error during model extraction: {str(e)}")
|
|
641
|
|
642
|
|
643
|
240
|
644 def __str__(self) -> str: return self.value |