comparison COBRAxy/utils/flux_to_map.py @ 233:55ad01b84c7f draft

Uploaded
author luca_milaz
date Tue, 17 Dec 2024 11:04:35 +0000
parents
children
comparison
equal deleted inserted replaced
232:1b5c8b6defca 233:55ad01b84c7f
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import utils.general_utils as utils
13 from PIL import Image
14 import os
15 import copy
16 import argparse
17 import pyvips
18 from PIL import Image, ImageDraw, ImageFont
19 from typing import Tuple, Union, Optional, List, Dict
20 import matplotlib.pyplot as plt
21
22 ERRORS = []
23 ########################## argparse ##########################################
24 ARGS :argparse.Namespace
25 def process_args(args:List[str] = None) -> argparse.Namespace:
26 """
27 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
28
29 Args:
30 args : Always obtained (in file) from sys.argv
31
32 Returns:
33 Namespace : An object containing the parsed arguments
34 """
35 parser = argparse.ArgumentParser(
36 usage = "%(prog)s [options]",
37 description = "process some value's genes to create a comparison's map.")
38
39 #General:
40 parser.add_argument(
41 '-td', '--tool_dir',
42 type = str,
43 required = True,
44 help = 'your tool directory')
45
46 parser.add_argument('-on', '--control', type = str)
47 parser.add_argument('-ol', '--out_log', help = "Output log")
48
49 #Computation details:
50 parser.add_argument(
51 '-co', '--comparison',
52 type = str,
53 default = '1vs1',
54 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
55
56 parser.add_argument(
57 '-pv' ,'--pValue',
58 type = float,
59 default = 0.1,
60 help = 'P-Value threshold (default: %(default)s)')
61
62 parser.add_argument(
63 '-fc', '--fChange',
64 type = float,
65 default = 1.5,
66 help = 'Fold-Change threshold (default: %(default)s)')
67
68
69 parser.add_argument(
70 '-op', '--option',
71 type = str,
72 choices = ['datasets', 'dataset_class'],
73 help='dataset or dataset and class')
74
75 parser.add_argument(
76 '-idf', '--input_data_fluxes',
77 type = str,
78 help = 'input dataset fluxes')
79
80 parser.add_argument(
81 '-icf', '--input_class_fluxes',
82 type = str,
83 help = 'sample group specification fluxes')
84
85 parser.add_argument(
86 '-idsf', '--input_datas_fluxes',
87 type = str,
88 nargs = '+',
89 help = 'input datasets fluxes')
90
91 parser.add_argument(
92 '-naf', '--names_fluxes',
93 type = str,
94 nargs = '+',
95 help = 'input names fluxes')
96
97 #Output:
98 parser.add_argument(
99 "-gs", "--generate_svg",
100 type = utils.Bool("generate_svg"), default = True,
101 help = "choose whether to generate svg")
102
103 parser.add_argument(
104 "-gp", "--generate_pdf",
105 type = utils.Bool("generate_pdf"), default = True,
106 help = "choose whether to generate pdf")
107
108 parser.add_argument(
109 '-cm', '--custom_map',
110 type = str,
111 help='custom map to use')
112
113 parser.add_argument(
114 '-mc', '--choice_map',
115 type = utils.Model, default = utils.Model.HMRcore,
116 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
117
118 parser.add_argument(
119 '-colorm', '--color_map',
120 type = str,
121 choices = ["jet", "viridis"])
122
123 parser.add_argument(
124 '-idop', '--output_path',
125 type = str,
126 default='result',
127 help = 'output path for maps')
128
129 args :argparse.Namespace = parser.parse_args(args)
130 args.net = True # TODO SICCOME I FLUSSI POSSONO ESSERE ANCHE NEGATIVI SONO SEMPRE CONSIDERATI NETTI
131
132 return args
133
134 ############################ dataset input ####################################
135 def read_dataset(data :str, name :str) -> pd.DataFrame:
136 """
137 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
138
139 Args:
140 data : filepath of a dataset (from frontend input params or literals upon calling)
141 name : name associated with the dataset (from frontend input params or literals upon calling)
142
143 Returns:
144 pd.DataFrame : dataset in a runtime operable shape
145
146 Raises:
147 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
148 """
149 try:
150 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
151 except pd.errors.EmptyDataError:
152 sys.exit('Execution aborted: wrong format of ' + name + '\n')
153 if len(dataset.columns) < 2:
154 sys.exit('Execution aborted: wrong format of ' + name + '\n')
155 return dataset
156
157 ############################ dataset name #####################################
158 def name_dataset(name_data :str, count :int) -> str:
159 """
160 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
161
162 Args:
163 name_data : name associated with the dataset (from frontend input params)
164 count : counter from 1 to make these names unique (external)
165
166 Returns:
167 str : the name made unique
168 """
169 if str(name_data) == 'Dataset':
170 return str(name_data) + '_' + str(count)
171 else:
172 return str(name_data)
173
174 ############################ map_methods ######################################
175 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
176 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
177 """
178 Calculates the fold change between two gene expression values.
179
180 Args:
181 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
182
183 Returns:
184 FoldChange :
185 0 : when both input values are 0
186 "-INF" : when avg1 is 0
187 "INF" : when avg2 is 0
188 float : for any other combination of values
189 """
190 if avg1 == 0 and avg2 == 0:
191 return 0
192 elif avg1 == 0:
193 return '-INF'
194 elif avg2 == 0:
195 return 'INF'
196 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
197 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
198
199 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
200 """
201 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
202
203 Args:
204 l : current style string of an SVG element
205 col : new value for the "stroke" style property
206 width : new value for the "stroke-width" style property
207 dash : new value for the "stroke-dasharray" style property
208
209 Returns:
210 str : the fixed style string
211 """
212 tmp = l.split(';')
213 flag_col = False
214 flag_width = False
215 flag_dash = False
216 for i in range(len(tmp)):
217 if tmp[i].startswith('stroke:'):
218 tmp[i] = 'stroke:' + col
219 flag_col = True
220 if tmp[i].startswith('stroke-width:'):
221 tmp[i] = 'stroke-width:' + width
222 flag_width = True
223 if tmp[i].startswith('stroke-dasharray:'):
224 tmp[i] = 'stroke-dasharray:' + dash
225 flag_dash = True
226 if not flag_col:
227 tmp.append('stroke:' + col)
228 if not flag_width:
229 tmp.append('stroke-width:' + width)
230 if not flag_dash:
231 tmp.append('stroke-dasharray:' + dash)
232 return ';'.join(tmp)
233
234 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
235 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
236 """
237 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
238
239 Args:
240 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
241 core_map : SVG map to modify
242 threshold_P_V : threshold for a p-value to be considered significant
243 threshold_F_C : threshold for a fold change value to be considered significant
244 max_z_score : highest z-score (absolute value)
245
246 Returns:
247 ET.ElementTree : the modified core_map
248
249 Side effects:
250 core_map : mut
251 """
252 maxT = 12
253 minT = 2
254 grey = '#BEBEBE'
255 blue = '#6495ed' # azzurrino
256 red = '#ecac68' # arancione
257 for el in core_map.iter():
258 el_id = str(el.get('id'))
259 if el_id.startswith('R_'):
260 tmp = d.get(el_id[2:])
261 if tmp != None:
262 p_val :float = tmp[0]
263 f_c = tmp[1]
264 z_score = tmp[2]
265 if p_val < threshold_P_V:
266 if not isinstance(f_c, str):
267 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): #
268 col = grey
269 width = str(minT)
270 else:
271 if f_c < 0:
272 col = blue
273 elif f_c > 0:
274 col = red
275 width = str(max((abs(z_score) * maxT) / max_z_score, minT))
276 else:
277 if f_c == '-INF':
278 col = blue
279 elif f_c == 'INF':
280 col = red
281 width = str(maxT)
282 dash = 'none'
283 else:
284 dash = '5,5'
285 col = grey
286 width = str(minT)
287 el.set('style', fix_style(el.get('style', ""), col, width, dash))
288 return core_map
289
290 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
291 """
292 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
293 not enforced, if more than one element with the exact ID is found only the first will be returned.
294
295 Args:
296 reactionId (str): exact ID of the requested element.
297 metabMap (ET.ElementTree): metabolic map containing the element.
298
299 Returns:
300 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
301 """
302 return utils.Result.Ok(
303 f"//*[@id=\"{reactionId}\"]").map(
304 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
305 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
306 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
307
308 def styleMapElement(element :ET.Element, styleStr :str) -> None:
309 currentStyles :str = element.get("style", "")
310 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
311 currentStyles = ';'.join(currentStyles.split(';')[:-3])
312
313 element.set("style", currentStyles + styleStr)
314
315 class ReactionDirection(Enum):
316 Unknown = ""
317 Direct = "_F"
318 Inverse = "_B"
319
320 @classmethod
321 def fromDir(cls, s :str) -> "ReactionDirection":
322 # vvv as long as there's so few variants I actually condone the if spam:
323 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
324 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
325 return ReactionDirection.Unknown
326
327 @classmethod
328 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
329 return ReactionDirection.fromDir(reactionId[-2:])
330
331 def getArrowBodyElementId(reactionId :str) -> str:
332 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
333 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
334 return f"R_{reactionId}"
335
336 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
337 """
338 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
339
340 Args:
341 reactionId : the provided reaction ID.
342
343 Returns:
344 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
345 """
346 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
347 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
348 return f"F_{reactionId}", f"B_{reactionId}"
349
350 class ArrowColor(Enum):
351 """
352 Encodes possible arrow colors based on their meaning in the enrichment process.
353 """
354 Invalid = "#BEBEBE" # gray, fold-change under treshold
355 Transparent = "#ffffff00" # white, not significant p-value
356 UpRegulated = "#ecac68" # red, up-regulated reaction
357 DownRegulated = "#6495ed" # blue, down-regulated reaction
358
359 UpRegulatedInv = "#FF0000"
360 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
361 # conflicting enrichment in the two directions.
362
363 DownRegulatedInv = "#0000FF"
364 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
365 # conflicting enrichment in the two directions.
366
367 @classmethod
368 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
369 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
370 return colors[useAltColor]
371
372 def __str__(self) -> str: return self.value
373
374 class Arrow:
375 """
376 Models the properties of a reaction arrow that change based on enrichment.
377 """
378 MIN_W = 2
379 MAX_W = 12
380
381 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
382 """
383 (Private) Initializes an instance of Arrow.
384
385 Args:
386 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
387 col : color of the arrow.
388 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
389
390 Returns:
391 None : practically, a Arrow instance.
392 """
393 self.w = width
394 self.col = col
395 self.dash = isDashed
396
397 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
398 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
399 ERRORS.append(reactionId)
400
401 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
402 if not mindReactionDir:
403 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
404
405 # Now we style the arrow head(s):
406 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
407 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
408 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
409
410 def styleReactionElementsMeanMedian(self, metabMap :ET.ElementTree, reactionId :str, isNegative:bool) -> None:
411
412 self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
413 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
414
415 if(isNegative):
416 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
417 self.col = ArrowColor.Transparent
418 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
419 else:
420 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
421 self.col = ArrowColor.Transparent
422 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
423
424
425
426 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
427 """
428 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
429
430 Args:
431 reactionId: the reaction ID, as encoded in the dataset.
432 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
433
434 Returns:
435 str : the ID of an arrow's body or tips in the map.
436 """
437 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
438 if not mindReactionDir: return "R_" + reactionId
439
440 #TODO: this is clearly something we need to make consistent in fluxes
441 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
442
443 def toStyleStr(self, *, downSizedForTips = False) -> str:
444 """
445 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
446
447 Returns:
448 str : the styles string.
449 """
450 width = self.w
451 if downSizedForTips: width *= 0.8
452 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
453
454 # vvv These constants could be inside the class itself a static properties, but python
455 # was built by brainless organisms so here we are!
456 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
457 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
458
459 def applyFluxesEnrichmentToMap(fluxesEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
460 """
461 Applies fluxes enrichment results to the provided metabolic map.
462
463 Args:
464 fluxesEnrichmentRes : fluxes enrichment results.
465 metabMap : the metabolic map to edit.
466 maxNumericZScore : biggest finite z-score value found.
467
468 Side effects:
469 metabMap : mut
470
471 Returns:
472 None
473 """
474 for reactionId, values in fluxesEnrichmentRes.items():
475 pValue = values[0]
476 foldChange = values[1]
477 z_score = values[2]
478
479 if isinstance(foldChange, str): foldChange = float(foldChange)
480 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
481 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
482 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
483
484 continue
485
486 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
487 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
488 INVALID_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
489
490 continue
491
492 width = Arrow.MAX_W
493 if not math.isinf(foldChange):
494 try:
495 width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
496
497 except ZeroDivisionError: pass
498 # TODO CHECK RV
499 #if not reactionId.endswith("_RV"): # RV stands for reversible reactions
500 # Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
501 # continue
502
503 #reactionId = reactionId[:-3] # Remove "_RV"
504
505 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
506 if inversionScore == 2: foldChange *= -1
507 # ^^^ Style the inverse direction with the opposite sign netValue
508
509 # If the score is 1 (opposite signs) we use alternative colors vvv
510 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
511
512 # vvv These 2 if statements can both be true and can both happen
513 if ARGS.net: # style arrow head(s):
514 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
515 arrow.applyTo(("F_" if inversionScore == 2 else "B_") + reactionId, metabMap, f";stroke:{ArrowColor.Transparent};stroke-width:0;stroke-dasharray:None")
516
517 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
518
519
520 ############################ split class ######################################
521 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
522 """
523 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
524
525 Args:
526 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
527 resolve_rules : a :dict containing :float data
528
529 Returns:
530 dict : the dict with data grouped by class
531
532 Side effects:
533 classes : mut
534 """
535 class_pat :Dict[str, List[List[float]]] = {}
536 for i in range(len(classes)):
537 classe :str = classes.iloc[i, 1]
538 if pd.isnull(classe): continue
539
540 l :List[List[float]] = []
541 for j in range(i, len(classes)):
542 if classes.iloc[j, 1] == classe:
543 pat_id :str = classes.iloc[j, 0]
544 tmp = resolve_rules.get(pat_id, None)
545 if tmp != None:
546 l.append(tmp)
547 classes.iloc[j, 1] = None
548
549 if l:
550 class_pat[classe] = list(map(list, zip(*l)))
551 continue
552
553 utils.logWarning(
554 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
555
556 return class_pat
557
558 ############################ conversion ##############################################
559 #conversion from svg to png
560 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
561 """
562 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
563
564 Args:
565 svg_path : path to SVG file
566 png_path : path for new PNG file
567 dpi : dots per inch of the generated PNG
568 scale : scaling factor for the generated PNG, computed internally when a size is provided
569 size : final effective width of the generated PNG
570
571 Returns:
572 None
573 """
574 if size:
575 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
576 scale = size / image.width
577 image = image.resize(scale)
578 else:
579 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
580
581 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
582 white_background = white_background.affine([scale, 0, 0, scale])
583
584 if white_background.bands != image.bands:
585 white_background = white_background.extract_band(0)
586
587 composite_image = white_background.composite2(image, 'over')
588 composite_image.write_to_file(png_path.show())
589
590 #funzione unica, lascio fuori i file e li passo in input
591 #conversion from png to pdf
592 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
593 """
594 Internal utility to convert a PNG to PDF to aid from SVG conversion.
595
596 Args:
597 png_file : path to PNG file
598 pdf_file : path to new PDF file
599
600 Returns:
601 None
602 """
603 image = Image.open(png_file.show())
604 image = image.convert("RGB")
605 image.save(pdf_file.show(), "PDF", resolution=100.0)
606
607 #function called to reduce redundancy in the code
608 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
609 """
610 Converts the SVG map at the provided path to PDF.
611
612 Args:
613 file_svg : path to SVG file
614 file_png : path to PNG file
615 file_pdf : path to new PDF file
616
617 Returns:
618 None
619 """
620 svg_to_png_with_background(file_svg, file_png)
621 try:
622 convert_png_to_pdf(file_png, file_pdf)
623 print(f'PDF file {file_pdf.filePath} successfully generated.')
624
625 except Exception as e:
626 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
627
628 ############################ map ##############################################
629 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
630 """
631 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
632 "result/" folder, used by this tool for output files in collections.
633
634 Args:
635 dataset1Name : _description_
636 dataset2Name : _description_. Defaults to "rest".
637 details : _description_
638 ext : _description_
639
640 Returns:
641 utils.FilePath : _description_
642 """
643 # This function returns a util data structure but is extremely specific to this module.
644 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
645 # TODO: until a third tool with multiple outputs appears before porting this to utils.
646 return utils.FilePath(
647 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
648 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
649 # all output files: I don't care, this was never the performance bottleneck of the tool and
650 # there is no other net gain in saving and re-using the built string.
651 ext,
652 prefix = ARGS.output_path)
653
654 FIELD_NOT_AVAILABLE = '/'
655 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
656 fieldsAmt = len(fieldNames)
657 with open(outPath.show(), "w", newline = "") as fd:
658 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
659 writer.writeheader()
660
661 for row in rows:
662 sizeMismatch = fieldsAmt - len(row)
663 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
664 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
665
666 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
667 def writeTabularResult(enrichedScores : OldEnrichedScores, outPath :utils.FilePath) -> None:
668 fieldNames = ["ids", "P_Value", "fold change", "z-score"]
669 fieldNames.extend(["average_1", "average_2"])
670
671 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
672
673 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest") -> None:
674 # this function compiles the things always in common between comparison modes after enrichment.
675 # TODO: organize, name better.
676 writeTabularResult(tmp, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
677 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
678 applyFluxesEnrichmentToMap(tmp, core_map, max_z_score)
679
680 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
681 """
682 Computes the statistical significance score (P-value) of the comparison between coherent data
683 from two datasets. The data is supposed to, in both datasets:
684 - be related to the same reaction ID;
685 - be ordered by sample, such that the item at position i in both lists is related to the
686 same sample or cell line.
687
688 Args:
689 dataset1Data : data from the 1st dataset.
690 dataset2Data : data from the 2nd dataset.
691
692 Returns:
693 tuple: (P-value, Z-score)
694 - P-value from a Kolmogorov-Smirnov test on the provided data.
695 - Z-score of the difference between means of the two datasets.
696 """
697 # Perform Kolmogorov-Smirnov test
698 ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
699
700 # Calculate means and standard deviations
701 mean1 = np.mean(dataset1Data)
702 mean2 = np.mean(dataset2Data)
703 std1 = np.std(dataset1Data, ddof=1)
704 std2 = np.std(dataset2Data, ddof=1)
705
706 n1 = len(dataset1Data)
707 n2 = len(dataset2Data)
708
709 # Calculate Z-score
710 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
711
712 return p_value, z_score
713
714 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
715 #TODO: the following code still suffers from "dumbvarnames-osis"
716 tmp :Dict[str, List[Union[float, FoldChange]]] = {}
717 count = 0
718 max_z_score = 0
719 for l1, l2 in zip(dataset1Data, dataset2Data):
720 reactId = ids[count]
721 count += 1
722 if not reactId: continue # we skip ids that have already been processed
723
724 try:
725 p_value, z_score = computePValue(l1, l2)
726 avg1 = sum(l1) / len(l1)
727 avg2 = sum(l2) / len(l2)
728 f_c = fold_change(avg1, avg2)
729 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
730
731 tmp[reactId] = [float(p_value), f_c, z_score, avg1, avg2]
732 except (TypeError, ZeroDivisionError): continue
733
734 return tmp, max_z_score
735
736 def computeEnrichment(class_pat :Dict[str, List[List[float]]], ids :List[str]) -> List[Tuple[str, str, dict, float]]:
737 """
738 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
739 provided metabolic map.
740
741 Args:
742 class_pat : the clustered data.
743 ids : ids for data association.
744
745
746 Returns:
747 List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
748
749 Raises:
750 sys.exit : if there are less than 2 classes for comparison
751
752 """
753 class_pat = { k.strip() : v for k, v in class_pat.items() }
754 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
755 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
756
757 enrichment_results = []
758
759
760 if ARGS.comparison == "manyvsmany":
761 for i, j in it.combinations(class_pat.keys(), 2):
762 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
763 enrichment_results.append((i, j, comparisonDict, max_z_score))
764
765 elif ARGS.comparison == "onevsrest":
766 for single_cluster in class_pat.keys():
767 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
768
769 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
770 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
771
772 elif ARGS.comparison == "onevsmany":
773 controlItems = class_pat.get(ARGS.control)
774 for otherDataset in class_pat.keys():
775 if otherDataset == ARGS.control:
776 continue
777 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
778 enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
779 return enrichment_results
780
781 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
782 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
783 utils.writeSvg(svgFilePath, core_map)
784
785 if ARGS.generate_pdf:
786 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
787 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
788 convert_to_pdf(svgFilePath, pngPath, pdfPath)
789
790 if not ARGS.generate_svg:
791 os.remove(svgFilePath.show())
792
793 ClassPat = Dict[str, List[List[float]]]
794 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
795 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
796 # for the sake of everyone's sanity.
797 class_pat :ClassPat = {}
798 if ARGS.option == 'datasets':
799 num = 1 #TODO: the dataset naming function could be a generator
800 for path, name in zip(datasetsPaths, names):
801 name = name_dataset(name, num)
802 resolve_rules_float, ids = getDatasetValues(path, name)
803 if resolve_rules_float != None:
804 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
805
806 num += 1
807
808 elif ARGS.option == "dataset_class":
809 classes = read_dataset(classPath, "class")
810 classes = classes.astype(str)
811 #check if classes have mathc on ids
812 if not all(classes.iloc[:, 0].isin(ids)):
813 raise utils.DataErr(classPath, "Class IDs do not match dataset IDs")
814 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
815 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
816
817 return ids, class_pat
818 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
819
820 #TODO: create these damn args as FilePath objects
821 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
822 """
823 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
824
825 Args:
826 datasetPath : path to the dataset
827 datasetName (str): dataset name, used in error reporting
828
829 Returns:
830 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
831 """
832 dataset = read_dataset(datasetPath, datasetName)
833 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
834
835 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
836 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
837
838 def rgb_to_hex(rgb):
839 """
840 Convert RGB values (0-1 range) to hexadecimal color format.
841
842 Args:
843 rgb (numpy.ndarray): An array of RGB color components (in the range [0, 1]).
844
845 Returns:
846 str: The color in hexadecimal format (e.g., '#ff0000' for red).
847 """
848 # Convert RGB values (0-1 range) to hexadecimal format
849 rgb = (np.array(rgb) * 255).astype(int)
850 return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
851
852
853
854 def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"):
855 """
856 Create and save an image of the colormap showing the gradient and its range.
857
858 Args:
859 min_value (float): The minimum value of the colormap range.
860 max_value (float): The maximum value of the colormap range.
861 filename (str): The filename for saving the image.
862 """
863
864 # Create a colormap using matplotlib
865 cmap = plt.get_cmap(colorMap)
866
867 # Create a figure and axis
868 fig, ax = plt.subplots(figsize=(6, 1))
869 fig.subplots_adjust(bottom=0.5)
870
871 # Create a gradient image
872 gradient = np.linspace(0, 1, 256)
873 gradient = np.vstack((gradient, gradient))
874
875 # Add min and max value annotations
876 ax.text(0, 0.5, f'{np.round(min_value, 3)}', va='center', ha='right', transform=ax.transAxes, fontsize=12, color='black')
877 ax.text(1, 0.5, f'{np.round(max_value, 3)}', va='center', ha='left', transform=ax.transAxes, fontsize=12, color='black')
878
879
880 # Display the gradient image
881 ax.imshow(gradient, aspect='auto', cmap=cmap)
882 ax.set_axis_off()
883
884 # Save the image
885 plt.savefig(path.show(), bbox_inches='tight', pad_inches=0)
886 plt.close()
887 pass
888
889 def min_nonzero_abs(arr):
890 # Flatten the array and filter out zeros, then find the minimum of the remaining values
891 non_zero_elements = np.abs(arr)[np.abs(arr) > 0]
892 return np.min(non_zero_elements) if non_zero_elements.size > 0 else None
893
894 def computeEnrichmentMeanMedian(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], colormap:str) -> None:
895 """
896 Compute and visualize the metabolic map based on mean and median of the input fluxes.
897 The fluxes are normalised across classes/datasets and visualised using the given colormap.
898
899 Args:
900 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
901 class_pat (Dict[str, List[List[float]]]): A dictionary where keys are class names and values are lists of enrichment values.
902 ids (List[str]): A list of reaction IDs to be used for coloring arrows.
903
904 Returns:
905 None
906 """
907 # Create copies only if they are needed
908 metabMap_mean = copy.deepcopy(metabMap)
909 metabMap_median = copy.deepcopy(metabMap)
910
911 # Compute medians and means
912 medians = {key: np.round(np.median(np.array(value), axis=1), 6) for key, value in class_pat.items()}
913 means = {key: np.round(np.mean(np.array(value), axis=1),6) for key, value in class_pat.items()}
914
915 # Normalize medians and means
916 max_flux_medians = max(np.max(np.abs(arr)) for arr in medians.values())
917 max_flux_means = max(np.max(np.abs(arr)) for arr in means.values())
918
919 min_flux_medians = min(min_nonzero_abs(arr) for arr in medians.values())
920 min_flux_means = min(min_nonzero_abs(arr) for arr in means.values())
921
922 medians = {key: median/max_flux_medians for key, median in medians.items()}
923 means = {key: mean/max_flux_means for key, mean in means.items()}
924
925 save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
926 save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
927
928 cmap = plt.get_cmap(colormap)
929
930 for key in class_pat:
931 # Create color mappings for median and mean
932 colors_median = {
933 rxn_id: rgb_to_hex(cmap(abs(medians[key][i]))) if medians[key][i] != 0 else '#bebebe' #grey blocked
934 for i, rxn_id in enumerate(ids)
935 }
936
937 colors_mean = {
938 rxn_id: rgb_to_hex(cmap(abs(means[key][i]))) if means[key][i] != 0 else '#bebebe' #grey blocked
939 for i, rxn_id in enumerate(ids)
940 }
941
942 for i, rxn_id in enumerate(ids):
943 isNegative = medians[key][i] < 0
944
945 # Apply median arrows
946 apply_arrow(metabMap_median, rxn_id, colors_median[rxn_id], isNegative)
947
948 isNegative = means[key][i] < 0
949 # Apply mean arrows
950 apply_arrow(metabMap_mean, rxn_id, colors_mean[rxn_id], isNegative)
951
952 # Save and convert the SVG files
953 save_and_convert(metabMap_mean, "mean", key)
954 save_and_convert(metabMap_median, "median", key)
955
956 def apply_arrow(metabMap, rxn_id, color, isNegative):
957 """
958 Apply an arrow to a specific reaction in the metabolic map with a given color.
959
960 Args:
961 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
962 rxn_id (str): The ID of the reaction to which the arrow will be applied.
963 color (str): The color of the arrow in hexadecimal format.
964
965 Returns:
966 None
967 """
968 arrow = Arrow(width=5, col=color)
969 arrow.styleReactionElementsMeanMedian(metabMap, rxn_id, isNegative)
970 pass
971
972 def save_and_convert(metabMap, map_type, key):
973 """
974 Save the metabolic map as an SVG file and optionally convert it to PNG and PDF formats.
975
976 Args:
977 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
978 map_type (str): The type of map ('mean' or 'median').
979 key (str): The key identifying the specific map.
980
981 Returns:
982 None
983 """
984 svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix=ARGS.output_path)
985 utils.writeSvg(svgFilePath, metabMap)
986 if ARGS.generate_pdf:
987 pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix=ARGS.output_path)
988 pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix=ARGS.output_path)
989 convert_to_pdf(svgFilePath, pngPath, pdfPath)
990 if not ARGS.generate_svg:
991 os.remove(svgFilePath.show())
992
993
994 ############################ MAIN #############################################
995 def main(args:List[str] = None) -> None:
996 """
997 Initializes everything and sets the program in motion based on the fronted input arguments.
998
999 Returns:
1000 None
1001
1002 Raises:
1003 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
1004 """
1005
1006 global ARGS
1007 ARGS = process_args(args)
1008
1009 if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
1010
1011 core_map :ET.ElementTree = ARGS.choice_map.getMap(
1012 ARGS.tool_dir,
1013 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
1014 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
1015 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
1016 # solution can be derived from my comment in FilePath.fromStrPath
1017
1018 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_fluxes, ARGS.input_data_fluxes, ARGS.input_class_fluxes, ARGS.names_fluxes)
1019
1020 if(ARGS.choice_map == utils.Model.HMRcore):
1021 temp_map = utils.Model.HMRcore_no_legend
1022 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1023 elif(ARGS.choice_map == utils.Model.ENGRO2):
1024 temp_map = utils.Model.ENGRO2_no_legend
1025 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1026 else:
1027 computeEnrichmentMeanMedian(core_map, class_pat, ids, ARGS.color_map)
1028
1029
1030 enrichment_results = computeEnrichment(class_pat, ids)
1031 for i, j, comparisonDict, max_z_score in enrichment_results:
1032 map_copy = copy.deepcopy(core_map)
1033 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j)
1034 createOutputMaps(i, j, map_copy)
1035
1036 if not ERRORS: return
1037 utils.logWarning(
1038 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
1039 ARGS.out_log)
1040
1041 print('Execution succeded')
1042
1043 ###############################################################################
1044 if __name__ == "__main__":
1045 main()
1046