comparison cobraxy-9688ad27287b/COBRAxy/flux_to_map.py @ 90:a48b2e06ebe7 draft

Uploaded
author luca_milaz
date Sun, 13 Oct 2024 11:35:56 +0000
parents
children
comparison
equal deleted inserted replaced
89:6ddfc81e97d1 90:a48b2e06ebe7
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import utils.general_utils as utils
13 from PIL import Image
14 import os
15 import copy
16 import argparse
17 import pyvips
18 from PIL import Image, ImageDraw, ImageFont
19 from typing import Tuple, Union, Optional, List, Dict
20 import matplotlib.pyplot as plt
21
22 ERRORS = []
23 ########################## argparse ##########################################
24 ARGS :argparse.Namespace
25 def process_args() -> argparse.Namespace:
26 """
27 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
28
29 Args:
30 args : Always obtained (in file) from sys.argv
31
32 Returns:
33 Namespace : An object containing the parsed arguments
34 """
35 parser = argparse.ArgumentParser(
36 usage = "%(prog)s [options]",
37 description = "process some value's genes to create a comparison's map.")
38
39 #General:
40 parser.add_argument(
41 '-td', '--tool_dir',
42 type = str,
43 required = True,
44 help = 'your tool directory')
45
46 parser.add_argument('-on', '--control', type = str)
47 parser.add_argument('-ol', '--out_log', help = "Output log")
48
49 #Computation details:
50 parser.add_argument(
51 '-co', '--comparison',
52 type = str,
53 default = '1vs1',
54 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
55
56 parser.add_argument(
57 '-pv' ,'--pValue',
58 type = float,
59 default = 0.1,
60 help = 'P-Value threshold (default: %(default)s)')
61
62 parser.add_argument(
63 '-fc', '--fChange',
64 type = float,
65 default = 1.5,
66 help = 'Fold-Change threshold (default: %(default)s)')
67
68
69 parser.add_argument(
70 '-op', '--option',
71 type = str,
72 choices = ['datasets', 'dataset_class'],
73 help='dataset or dataset and class')
74
75 parser.add_argument(
76 '-idf', '--input_data_fluxes',
77 type = str,
78 help = 'input dataset fluxes')
79
80 parser.add_argument(
81 '-icf', '--input_class_fluxes',
82 type = str,
83 help = 'sample group specification fluxes')
84
85 parser.add_argument(
86 '-idsf', '--input_datas_fluxes',
87 type = str,
88 nargs = '+',
89 help = 'input datasets fluxes')
90
91 parser.add_argument(
92 '-naf', '--names_fluxes',
93 type = str,
94 nargs = '+',
95 help = 'input names fluxes')
96
97 #Output:
98 parser.add_argument(
99 "-gs", "--generate_svg",
100 type = utils.Bool("generate_svg"), default = True,
101 help = "choose whether to generate svg")
102
103 parser.add_argument(
104 "-gp", "--generate_pdf",
105 type = utils.Bool("generate_pdf"), default = True,
106 help = "choose whether to generate pdf")
107
108 parser.add_argument(
109 '-cm', '--custom_map',
110 type = str,
111 help='custom map to use')
112
113 parser.add_argument(
114 '-mc', '--choice_map',
115 type = utils.Model, default = utils.Model.HMRcore,
116 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
117
118 parser.add_argument(
119 '-colorm', '--color_map',
120 type = str,
121 choices = ["jet", "viridis"])
122
123 args :argparse.Namespace = parser.parse_args()
124 args.net = True
125
126 return args
127
128 ############################ dataset input ####################################
129 def read_dataset(data :str, name :str) -> pd.DataFrame:
130 """
131 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
132
133 Args:
134 data : filepath of a dataset (from frontend input params or literals upon calling)
135 name : name associated with the dataset (from frontend input params or literals upon calling)
136
137 Returns:
138 pd.DataFrame : dataset in a runtime operable shape
139
140 Raises:
141 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
142 """
143 try:
144 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
145 except pd.errors.EmptyDataError:
146 sys.exit('Execution aborted: wrong format of ' + name + '\n')
147 if len(dataset.columns) < 2:
148 sys.exit('Execution aborted: wrong format of ' + name + '\n')
149 return dataset
150
151 ############################ dataset name #####################################
152 def name_dataset(name_data :str, count :int) -> str:
153 """
154 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
155
156 Args:
157 name_data : name associated with the dataset (from frontend input params)
158 count : counter from 1 to make these names unique (external)
159
160 Returns:
161 str : the name made unique
162 """
163 if str(name_data) == 'Dataset':
164 return str(name_data) + '_' + str(count)
165 else:
166 return str(name_data)
167
168 ############################ map_methods ######################################
169 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
170 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
171 """
172 Calculates the fold change between two gene expression values.
173
174 Args:
175 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
176
177 Returns:
178 FoldChange :
179 0 : when both input values are 0
180 "-INF" : when avg1 is 0
181 "INF" : when avg2 is 0
182 float : for any other combination of values
183 """
184 if avg1 == 0 and avg2 == 0:
185 return 0
186 elif avg1 == 0:
187 return '-INF'
188 elif avg2 == 0:
189 return 'INF'
190 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
191 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
192
193 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
194 """
195 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
196
197 Args:
198 l : current style string of an SVG element
199 col : new value for the "stroke" style property
200 width : new value for the "stroke-width" style property
201 dash : new value for the "stroke-dasharray" style property
202
203 Returns:
204 str : the fixed style string
205 """
206 tmp = l.split(';')
207 flag_col = False
208 flag_width = False
209 flag_dash = False
210 for i in range(len(tmp)):
211 if tmp[i].startswith('stroke:'):
212 tmp[i] = 'stroke:' + col
213 flag_col = True
214 if tmp[i].startswith('stroke-width:'):
215 tmp[i] = 'stroke-width:' + width
216 flag_width = True
217 if tmp[i].startswith('stroke-dasharray:'):
218 tmp[i] = 'stroke-dasharray:' + dash
219 flag_dash = True
220 if not flag_col:
221 tmp.append('stroke:' + col)
222 if not flag_width:
223 tmp.append('stroke-width:' + width)
224 if not flag_dash:
225 tmp.append('stroke-dasharray:' + dash)
226 return ';'.join(tmp)
227
228 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
229 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
230 """
231 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
232
233 Args:
234 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
235 core_map : SVG map to modify
236 threshold_P_V : threshold for a p-value to be considered significant
237 threshold_F_C : threshold for a fold change value to be considered significant
238 max_z_score : highest z-score (absolute value)
239
240 Returns:
241 ET.ElementTree : the modified core_map
242
243 Side effects:
244 core_map : mut
245 """
246 maxT = 12
247 minT = 2
248 grey = '#BEBEBE'
249 blue = '#6495ed'
250 red = '#ecac68'
251 for el in core_map.iter():
252 el_id = str(el.get('id'))
253 if el_id.startswith('R_'):
254 tmp = d.get(el_id[2:])
255 if tmp != None:
256 p_val :float = tmp[0]
257 f_c = tmp[1]
258 z_score = tmp[2]
259 if p_val < threshold_P_V:
260 if not isinstance(f_c, str):
261 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): #
262 col = grey
263 width = str(minT)
264 else:
265 if f_c < 0:
266 col = blue
267 elif f_c > 0:
268 col = red
269 width = str(max((abs(z_score) * maxT) / max_z_score, minT))
270 else:
271 if f_c == '-INF':
272 col = blue
273 elif f_c == 'INF':
274 col = red
275 width = str(maxT)
276 dash = 'none'
277 else:
278 dash = '5,5'
279 col = grey
280 width = str(minT)
281 el.set('style', fix_style(el.get('style', ""), col, width, dash))
282 return core_map
283
284 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
285 """
286 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
287 not enforced, if more than one element with the exact ID is found only the first will be returned.
288
289 Args:
290 reactionId (str): exact ID of the requested element.
291 metabMap (ET.ElementTree): metabolic map containing the element.
292
293 Returns:
294 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
295 """
296 return utils.Result.Ok(
297 f"//*[@id=\"{reactionId}\"]").map(
298 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
299 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
300 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
301
302 def styleMapElement(element :ET.Element, styleStr :str) -> None:
303 currentStyles :str = element.get("style", "")
304 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
305 currentStyles = ';'.join(currentStyles.split(';')[:-3])
306
307 element.set("style", currentStyles + styleStr)
308
309 class ReactionDirection(Enum):
310 Unknown = ""
311 Direct = "_F"
312 Inverse = "_B"
313
314 @classmethod
315 def fromDir(cls, s :str) -> "ReactionDirection":
316 # vvv as long as there's so few variants I actually condone the if spam:
317 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
318 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
319 return ReactionDirection.Unknown
320
321 @classmethod
322 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
323 return ReactionDirection.fromDir(reactionId[-2:])
324
325 def getArrowBodyElementId(reactionId :str) -> str:
326 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
327 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
328 return f"R_{reactionId}"
329
330 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
331 """
332 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
333
334 Args:
335 reactionId : the provided reaction ID.
336
337 Returns:
338 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
339 """
340 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
341 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
342 return f"F_{reactionId}", f"B_{reactionId}"
343
344 class ArrowColor(Enum):
345 """
346 Encodes possible arrow colors based on their meaning in the enrichment process.
347 """
348 Invalid = "#BEBEBE" # gray, fold-change under treshold
349 Transparent = "#ffffff00" # white, not significant p-value
350 UpRegulated = "#ecac68" # red, up-regulated reaction
351 DownRegulated = "#6495ed" # blue, down-regulated reaction
352
353 UpRegulatedInv = "#FF0000"
354 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
355 # conflicting enrichment in the two directions.
356
357 DownRegulatedInv = "#0000FF"
358 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
359 # conflicting enrichment in the two directions.
360
361 @classmethod
362 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
363 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
364 return colors[useAltColor]
365
366 def __str__(self) -> str: return self.value
367
368 class Arrow:
369 """
370 Models the properties of a reaction arrow that change based on enrichment.
371 """
372 MIN_W = 2
373 MAX_W = 12
374
375 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
376 """
377 (Private) Initializes an instance of Arrow.
378
379 Args:
380 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
381 col : color of the arrow.
382 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
383
384 Returns:
385 None : practically, a Arrow instance.
386 """
387 self.w = width
388 self.col = col
389 self.dash = isDashed
390
391 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
392 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
393 ERRORS.append(reactionId)
394
395 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
396 if not mindReactionDir:
397 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
398
399 # Now we style the arrow head(s):
400 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
401 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
402 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
403
404 def styleReactionElementsMeanMedian(self, metabMap :ET.ElementTree, reactionId :str, isNegative:bool) -> None:
405
406 self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
407 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
408
409 if(isNegative):
410 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
411 self.col = ArrowColor.Transparent
412 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
413 else:
414 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
415 self.col = ArrowColor.Transparent
416 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
417
418
419
420 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
421 """
422 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
423
424 Args:
425 reactionId: the reaction ID, as encoded in the dataset.
426 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
427
428 Returns:
429 str : the ID of an arrow's body or tips in the map.
430 """
431 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
432 if not mindReactionDir: return "R_" + reactionId
433
434 #TODO: this is clearly something we need to make consistent in fluxes
435 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
436
437 def toStyleStr(self, *, downSizedForTips = False) -> str:
438 """
439 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
440
441 Returns:
442 str : the styles string.
443 """
444 width = self.w
445 if downSizedForTips: width *= 0.8
446 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
447
448 # vvv These constants could be inside the class itself a static properties, but python
449 # was built by brainless organisms so here we are!
450 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
451 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
452
453 def applyFluxesEnrichmentToMap(fluxesEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
454 """
455 Applies fluxes enrichment results to the provided metabolic map.
456
457 Args:
458 fluxesEnrichmentRes : fluxes enrichment results.
459 metabMap : the metabolic map to edit.
460 maxNumericZScore : biggest finite z-score value found.
461
462 Side effects:
463 metabMap : mut
464
465 Returns:
466 None
467 """
468 for reactionId, values in fluxesEnrichmentRes.items():
469 pValue = values[0]
470 foldChange = values[1]
471 z_score = values[2]
472
473 if isinstance(foldChange, str): foldChange = float(foldChange)
474 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
475 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
476 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
477
478 continue
479
480 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
481 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
482 INVALID_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
483
484 continue
485
486 width = Arrow.MAX_W
487 if not math.isinf(foldChange):
488 try:
489 width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
490
491 except ZeroDivisionError: pass
492
493 #if not reactionId.endswith("_RV"): # RV stands for reversible reactions
494 # Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
495 # continue
496
497 #reactionId = reactionId[:-3] # Remove "_RV"
498
499 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
500 if inversionScore == 2: foldChange *= -1
501 # ^^^ Style the inverse direction with the opposite sign netValue
502
503 # If the score is 1 (opposite signs) we use alternative colors vvv
504 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
505
506 # vvv These 2 if statements can both be true and can both happen
507 if ARGS.net: # style arrow head(s):
508 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
509 arrow.applyTo(("F_" if inversionScore == 2 else "B_") + reactionId, metabMap, f";stroke:{ArrowColor.Transparent};stroke-width:0;stroke-dasharray:None")
510
511 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
512
513
514 ############################ split class ######################################
515 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
516 """
517 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
518
519 Args:
520 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
521 resolve_rules : a :dict containing :float data
522
523 Returns:
524 dict : the dict with data grouped by class
525
526 Side effects:
527 classes : mut
528 """
529 class_pat :Dict[str, List[List[float]]] = {}
530 for i in range(len(classes)):
531 classe :str = classes.iloc[i, 1]
532 if pd.isnull(classe): continue
533
534 l :List[List[float]] = []
535 for j in range(i, len(classes)):
536 if classes.iloc[j, 1] == classe:
537 pat_id :str = classes.iloc[j, 0]
538 tmp = resolve_rules.get(pat_id, None)
539 if tmp != None:
540 l.append(tmp)
541 classes.iloc[j, 1] = None
542
543 if l:
544 class_pat[classe] = list(map(list, zip(*l)))
545 continue
546
547 utils.logWarning(
548 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
549
550 return class_pat
551
552 ############################ conversion ##############################################
553 #conversion from svg to png
554 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
555 """
556 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
557
558 Args:
559 svg_path : path to SVG file
560 png_path : path for new PNG file
561 dpi : dots per inch of the generated PNG
562 scale : scaling factor for the generated PNG, computed internally when a size is provided
563 size : final effective width of the generated PNG
564
565 Returns:
566 None
567 """
568 if size:
569 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
570 scale = size / image.width
571 image = image.resize(scale)
572 else:
573 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
574
575 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
576 white_background = white_background.affine([scale, 0, 0, scale])
577
578 if white_background.bands != image.bands:
579 white_background = white_background.extract_band(0)
580
581 composite_image = white_background.composite2(image, 'over')
582 composite_image.write_to_file(png_path.show())
583
584 #funzione unica, lascio fuori i file e li passo in input
585 #conversion from png to pdf
586 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
587 """
588 Internal utility to convert a PNG to PDF to aid from SVG conversion.
589
590 Args:
591 png_file : path to PNG file
592 pdf_file : path to new PDF file
593
594 Returns:
595 None
596 """
597 image = Image.open(png_file.show())
598 image = image.convert("RGB")
599 image.save(pdf_file.show(), "PDF", resolution=100.0)
600
601 #function called to reduce redundancy in the code
602 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
603 """
604 Converts the SVG map at the provided path to PDF.
605
606 Args:
607 file_svg : path to SVG file
608 file_png : path to PNG file
609 file_pdf : path to new PDF file
610
611 Returns:
612 None
613 """
614 svg_to_png_with_background(file_svg, file_png)
615 try:
616 convert_png_to_pdf(file_png, file_pdf)
617 print(f'PDF file {file_pdf.filePath} successfully generated.')
618
619 except Exception as e:
620 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
621
622 ############################ map ##############################################
623 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
624 """
625 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
626 "result/" folder, used by this tool for output files in collections.
627
628 Args:
629 dataset1Name : _description_
630 dataset2Name : _description_. Defaults to "rest".
631 details : _description_
632 ext : _description_
633
634 Returns:
635 utils.FilePath : _description_
636 """
637 # This function returns a util data structure but is extremely specific to this module.
638 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
639 # TODO: until a third tool with multiple outputs appears before porting this to utils.
640 return utils.FilePath(
641 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
642 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
643 # all output files: I don't care, this was never the performance bottleneck of the tool and
644 # there is no other net gain in saving and re-using the built string.
645 ext,
646 prefix = "result")
647
648 FIELD_NOT_AVAILABLE = '/'
649 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
650 fieldsAmt = len(fieldNames)
651 with open(outPath.show(), "w", newline = "") as fd:
652 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
653 writer.writeheader()
654
655 for row in rows:
656 sizeMismatch = fieldsAmt - len(row)
657 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
658 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
659
660 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
661 def writeTabularResult(enrichedScores : OldEnrichedScores, outPath :utils.FilePath) -> None:
662 fieldNames = ["ids", "P_Value", "fold change"]
663 fieldNames.extend(["average_1", "average_2"])
664
665 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
666
667 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest") -> None:
668 # this function compiles the things always in common between comparison modes after enrichment.
669 # TODO: organize, name better.
670 writeTabularResult(tmp, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
671 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
672 applyFluxesEnrichmentToMap(tmp, core_map, max_z_score)
673
674 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
675 """
676 Computes the statistical significance score (P-value) of the comparison between coherent data
677 from two datasets. The data is supposed to, in both datasets:
678 - be related to the same reaction ID;
679 - be ordered by sample, such that the item at position i in both lists is related to the
680 same sample or cell line.
681
682 Args:
683 dataset1Data : data from the 1st dataset.
684 dataset2Data : data from the 2nd dataset.
685
686 Returns:
687 tuple: (P-value, Z-score)
688 - P-value from a Kolmogorov-Smirnov test on the provided data.
689 - Z-score of the difference between means of the two datasets.
690 """
691 # Perform Kolmogorov-Smirnov test
692 ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
693
694 # Calculate means and standard deviations
695 mean1 = np.mean(dataset1Data)
696 mean2 = np.mean(dataset2Data)
697 std1 = np.std(dataset1Data, ddof=1)
698 std2 = np.std(dataset2Data, ddof=1)
699
700 n1 = len(dataset1Data)
701 n2 = len(dataset2Data)
702
703 # Calculate Z-score
704 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
705
706 return p_value, z_score
707
708 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
709 #TODO: the following code still suffers from "dumbvarnames-osis"
710 tmp :Dict[str, List[Union[float, FoldChange]]] = {}
711 count = 0
712 max_z_score = 0
713
714 for l1, l2 in zip(dataset1Data, dataset2Data):
715 reactId = ids[count]
716 count += 1
717 if not reactId: continue # we skip ids that have already been processed
718
719 try:
720 p_value, z_score = computePValue(l1, l2)
721 avg1 = sum(l1) / len(l1)
722 avg2 = sum(l2) / len(l2)
723 avg = fold_change(avg1, avg2)
724 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
725 tmp[reactId] = [float(p_value), avg, z_score, avg1, avg2]
726 except (TypeError, ZeroDivisionError): continue
727
728 return tmp, max_z_score
729
730 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str]) -> None:
731 """
732 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
733 provided metabolic map.
734
735 Args:
736 metabMap : SVG map to modify.
737 class_pat : the clustered data.
738 ids : ids for data association.
739
740
741 Returns:
742 None
743
744 Raises:
745 sys.exit : if there are less than 2 classes for comparison
746
747 Side effects:
748 metabMap : mut
749 ids : mut
750 """
751 class_pat = { k.strip() : v for k, v in class_pat.items() }
752 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
753 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
754
755 if ARGS.comparison == "manyvsmany":
756 for i, j in it.combinations(class_pat.keys(), 2):
757 #TODO: these 2 functions are always called in pair and in this order and need common data,
758 # some clever refactoring would be appreciated.
759 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
760 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j)
761
762 elif ARGS.comparison == "onevsrest":
763 for single_cluster in class_pat.keys():
764 t :List[List[List[float]]] = []
765 for k in class_pat.keys():
766 if k != single_cluster:
767 t.append(class_pat.get(k))
768
769 rest :List[List[float]] = []
770 for i in t:
771 rest = rest + i
772
773 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
774 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster)
775
776 elif ARGS.comparison == "onevsmany":
777 controlItems = class_pat.get(ARGS.control)
778 for otherDataset in class_pat.keys():
779 if otherDataset == ARGS.control: continue
780
781 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
782 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset)
783
784 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
785 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)
786 utils.writeSvg(svgFilePath, core_map)
787
788 if ARGS.generate_pdf:
789 pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG)
790 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF)
791 convert_to_pdf(svgFilePath, pngPath, pdfPath)
792
793 if not ARGS.generate_svg: os.remove(svgFilePath.show())
794
795 ClassPat = Dict[str, List[List[float]]]
796 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
797 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
798 # for the sake of everyone's sanity.
799 class_pat :ClassPat = {}
800 if ARGS.option == 'datasets':
801 num = 1 #TODO: the dataset naming function could be a generator
802 for path, name in zip(datasetsPaths, names):
803 name = name_dataset(name, num)
804 resolve_rules_float, ids = getDatasetValues(path, name)
805 if resolve_rules_float != None:
806 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
807
808 num += 1
809
810 elif ARGS.option == "dataset_class":
811 classes = read_dataset(classPath, "class")
812 classes = classes.astype(str)
813
814 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
815 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
816
817 return ids, class_pat
818 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
819
820 #TODO: create these damn args as FilePath objects
821 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
822 """
823 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
824
825 Args:
826 datasetPath : path to the dataset
827 datasetName (str): dataset name, used in error reporting
828
829 Returns:
830 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
831 """
832 dataset = read_dataset(datasetPath, datasetName)
833 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
834
835 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
836 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
837
838 def rgb_to_hex(rgb):
839 """
840 Convert RGB values (0-1 range) to hexadecimal color format.
841
842 Args:
843 rgb (numpy.ndarray): An array of RGB color components (in the range [0, 1]).
844
845 Returns:
846 str: The color in hexadecimal format (e.g., '#ff0000' for red).
847 """
848 # Convert RGB values (0-1 range) to hexadecimal format
849 rgb = (np.array(rgb) * 255).astype(int)
850 return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
851
852
853
854 def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"):
855 """
856 Create and save an image of the colormap showing the gradient and its range.
857
858 Args:
859 min_value (float): The minimum value of the colormap range.
860 max_value (float): The maximum value of the colormap range.
861 filename (str): The filename for saving the image.
862 """
863
864 # Create a colormap using matplotlib
865 cmap = plt.get_cmap(colorMap)
866
867 # Create a figure and axis
868 fig, ax = plt.subplots(figsize=(6, 1))
869 fig.subplots_adjust(bottom=0.5)
870
871 # Create a gradient image
872 gradient = np.linspace(0, 1, 256)
873 gradient = np.vstack((gradient, gradient))
874
875 # Add min and max value annotations
876 ax.text(0, 0.5, f'{np.round(min_value, 3)}', va='center', ha='right', transform=ax.transAxes, fontsize=12, color='black')
877 ax.text(1, 0.5, f'{np.round(max_value, 3)}', va='center', ha='left', transform=ax.transAxes, fontsize=12, color='black')
878
879
880 # Display the gradient image
881 ax.imshow(gradient, aspect='auto', cmap=cmap)
882 ax.set_axis_off()
883
884 # Save the image
885 plt.savefig(path.show(), bbox_inches='tight', pad_inches=0)
886 plt.close()
887 pass
888
889 def min_nonzero_abs(arr):
890 # Flatten the array and filter out zeros, then find the minimum of the remaining values
891 non_zero_elements = np.abs(arr)[np.abs(arr) > 0]
892 return np.min(non_zero_elements) if non_zero_elements.size > 0 else None
893
894 def computeEnrichmentMeanMedian(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], colormap:str) -> None:
895 """
896 Compute and visualize the metabolic map based on mean and median of the input fluxes.
897 The fluxes are normalised across classes/datasets and visualised using the given colormap.
898
899 Args:
900 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
901 class_pat (Dict[str, List[List[float]]]): A dictionary where keys are class names and values are lists of enrichment values.
902 ids (List[str]): A list of reaction IDs to be used for coloring arrows.
903
904 Returns:
905 None
906 """
907 # Create copies only if they are needed
908 metabMap_mean = copy.deepcopy(metabMap)
909 metabMap_median = copy.deepcopy(metabMap)
910
911 # Compute medians and means
912 medians = {key: np.round(np.median(np.array(value), axis=1), 6) for key, value in class_pat.items()}
913 means = {key: np.round(np.mean(np.array(value), axis=1),6) for key, value in class_pat.items()}
914
915 # Normalize medians and means
916 max_flux_medians = max(np.max(np.abs(arr)) for arr in medians.values())
917 max_flux_means = max(np.max(np.abs(arr)) for arr in means.values())
918
919 min_flux_medians = min(min_nonzero_abs(arr) for arr in medians.values())
920 min_flux_means = min(min_nonzero_abs(arr) for arr in means.values())
921
922 medians = {key: median/max_flux_medians for key, median in medians.items()}
923 means = {key: mean/max_flux_means for key, mean in means.items()}
924
925 save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix="result"), colormap)
926 save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix="result"), colormap)
927
928 cmap = plt.get_cmap(colormap)
929
930 for key in class_pat:
931 # Create color mappings for median and mean
932 colors_median = {
933 rxn_id: rgb_to_hex(cmap(abs(medians[key][i]))) if medians[key][i] != 0 else '#bebebe' #grey blocked
934 for i, rxn_id in enumerate(ids)
935 }
936
937 colors_mean = {
938 rxn_id: rgb_to_hex(cmap(abs(means[key][i]))) if means[key][i] != 0 else '#bebebe' #grey blocked
939 for i, rxn_id in enumerate(ids)
940 }
941
942 for i, rxn_id in enumerate(ids):
943 isNegative = medians[key][i] < 0
944
945 # Apply median arrows
946 apply_arrow(metabMap_median, rxn_id, colors_median[rxn_id], isNegative)
947
948 isNegative = means[key][i] < 0
949 # Apply mean arrows
950 apply_arrow(metabMap_mean, rxn_id, colors_mean[rxn_id], isNegative)
951
952 # Save and convert the SVG files
953 save_and_convert(metabMap_mean, "mean", key)
954 save_and_convert(metabMap_median, "median", key)
955
956 def apply_arrow(metabMap, rxn_id, color, isNegative):
957 """
958 Apply an arrow to a specific reaction in the metabolic map with a given color.
959
960 Args:
961 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
962 rxn_id (str): The ID of the reaction to which the arrow will be applied.
963 color (str): The color of the arrow in hexadecimal format.
964
965 Returns:
966 None
967 """
968 arrow = Arrow(width=5, col=color)
969 arrow.styleReactionElementsMeanMedian(metabMap, rxn_id, isNegative)
970 pass
971
972 def save_and_convert(metabMap, map_type, key):
973 """
974 Save the metabolic map as an SVG file and optionally convert it to PNG and PDF formats.
975
976 Args:
977 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
978 map_type (str): The type of map ('mean' or 'median').
979 key (str): The key identifying the specific map.
980
981 Returns:
982 None
983 """
984 svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix="result")
985 utils.writeSvg(svgFilePath, metabMap)
986 if ARGS.generate_pdf:
987 pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix="result")
988 pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix="result")
989 convert_to_pdf(svgFilePath, pngPath, pdfPath)
990 if not ARGS.generate_svg:
991 os.remove(svgFilePath.show())
992
993
994
995
996 ############################ MAIN #############################################
997 def main() -> None:
998 """
999 Initializes everything and sets the program in motion based on the fronted input arguments.
1000
1001 Returns:
1002 None
1003
1004 Raises:
1005 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
1006 """
1007
1008 global ARGS
1009 ARGS = process_args()
1010
1011 if os.path.isdir('result') == False: os.makedirs('result')
1012
1013 core_map :ET.ElementTree = ARGS.choice_map.getMap(
1014 ARGS.tool_dir,
1015 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
1016 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
1017 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
1018 # solution can be derived from my comment in FilePath.fromStrPath
1019
1020 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_fluxes, ARGS.input_data_fluxes, ARGS.input_class_fluxes, ARGS.names_fluxes)
1021
1022 if(ARGS.choice_map == utils.Model.HMRcore):
1023 temp_map = utils.Model.HMRcore_no_legend
1024 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1025 elif(ARGS.choice_map == utils.Model.ENGRO2):
1026 temp_map = utils.Model.ENGRO2_no_legend
1027 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1028 else:
1029 computeEnrichmentMeanMedian(core_map, class_pat, ids, ARGS.color_map)
1030
1031
1032 computeEnrichment(core_map, class_pat, ids)
1033
1034 # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this
1035 if ARGS.comparison == "manyvsmany":
1036 for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map)
1037 return
1038
1039 if ARGS.comparison == "onevsrest":
1040 for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map)
1041 return
1042
1043 for otherDataset in class_pat.keys():
1044 if otherDataset != ARGS.control: createOutputMaps(i, j, core_map)
1045
1046 if not ERRORS: return
1047 utils.logWarning(
1048 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
1049 ARGS.out_log)
1050
1051 print('Execution succeded')
1052
1053 ###############################################################################
1054 if __name__ == "__main__":
1055 main()