comparison flux_to_map.py @ 150:834009d1a094 draft

Uploaded
author luca_milaz
date Wed, 06 Nov 2024 21:00:17 +0000
parents
children
comparison
equal deleted inserted replaced
149:8f47c038cc28 150:834009d1a094
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import utils.general_utils as utils
13 from PIL import Image
14 import os
15 import copy
16 import argparse
17 import pyvips
18 from PIL import Image, ImageDraw, ImageFont
19 from typing import Tuple, Union, Optional, List, Dict
20 import matplotlib.pyplot as plt
21
22 ERRORS = []
23 ########################## argparse ##########################################
24 ARGS :argparse.Namespace
25 def process_args(args:List[str] = None) -> argparse.Namespace:
26 """
27 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
28
29 Args:
30 args : Always obtained (in file) from sys.argv
31
32 Returns:
33 Namespace : An object containing the parsed arguments
34 """
35 parser = argparse.ArgumentParser(
36 usage = "%(prog)s [options]",
37 description = "process some value's genes to create a comparison's map.")
38
39 #General:
40 parser.add_argument(
41 '-td', '--tool_dir',
42 type = str,
43 required = True,
44 help = 'your tool directory')
45
46 parser.add_argument('-on', '--control', type = str)
47 parser.add_argument('-ol', '--out_log', help = "Output log")
48
49 #Computation details:
50 parser.add_argument(
51 '-co', '--comparison',
52 type = str,
53 default = '1vs1',
54 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
55
56 parser.add_argument(
57 '-pv' ,'--pValue',
58 type = float,
59 default = 0.1,
60 help = 'P-Value threshold (default: %(default)s)')
61
62 parser.add_argument(
63 '-fc', '--fChange',
64 type = float,
65 default = 1.5,
66 help = 'Fold-Change threshold (default: %(default)s)')
67
68
69 parser.add_argument(
70 '-op', '--option',
71 type = str,
72 choices = ['datasets', 'dataset_class'],
73 help='dataset or dataset and class')
74
75 parser.add_argument(
76 '-idf', '--input_data_fluxes',
77 type = str,
78 help = 'input dataset fluxes')
79
80 parser.add_argument(
81 '-icf', '--input_class_fluxes',
82 type = str,
83 help = 'sample group specification fluxes')
84
85 parser.add_argument(
86 '-idsf', '--input_datas_fluxes',
87 type = str,
88 nargs = '+',
89 help = 'input datasets fluxes')
90
91 parser.add_argument(
92 '-naf', '--names_fluxes',
93 type = str,
94 nargs = '+',
95 help = 'input names fluxes')
96
97 #Output:
98 parser.add_argument(
99 "-gs", "--generate_svg",
100 type = utils.Bool("generate_svg"), default = True,
101 help = "choose whether to generate svg")
102
103 parser.add_argument(
104 "-gp", "--generate_pdf",
105 type = utils.Bool("generate_pdf"), default = True,
106 help = "choose whether to generate pdf")
107
108 parser.add_argument(
109 '-cm', '--custom_map',
110 type = str,
111 help='custom map to use')
112
113 parser.add_argument(
114 '-mc', '--choice_map',
115 type = utils.Model, default = utils.Model.HMRcore,
116 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
117
118 parser.add_argument(
119 '-colorm', '--color_map',
120 type = str,
121 choices = ["jet", "viridis"])
122
123 parser.add_argument(
124 '-idop', '--output_path',
125 type = str,
126 default='result',
127 help = 'output path for maps')
128
129 args :argparse.Namespace = parser.parse_args(args)
130 args.net = True
131
132 return args
133
134 ############################ dataset input ####################################
135 def read_dataset(data :str, name :str) -> pd.DataFrame:
136 """
137 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
138
139 Args:
140 data : filepath of a dataset (from frontend input params or literals upon calling)
141 name : name associated with the dataset (from frontend input params or literals upon calling)
142
143 Returns:
144 pd.DataFrame : dataset in a runtime operable shape
145
146 Raises:
147 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
148 """
149 try:
150 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
151 except pd.errors.EmptyDataError:
152 sys.exit('Execution aborted: wrong format of ' + name + '\n')
153 if len(dataset.columns) < 2:
154 sys.exit('Execution aborted: wrong format of ' + name + '\n')
155 return dataset
156
157 ############################ dataset name #####################################
158 def name_dataset(name_data :str, count :int) -> str:
159 """
160 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
161
162 Args:
163 name_data : name associated with the dataset (from frontend input params)
164 count : counter from 1 to make these names unique (external)
165
166 Returns:
167 str : the name made unique
168 """
169 if str(name_data) == 'Dataset':
170 return str(name_data) + '_' + str(count)
171 else:
172 return str(name_data)
173
174 ############################ map_methods ######################################
175 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
176 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
177 """
178 Calculates the fold change between two gene expression values.
179
180 Args:
181 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
182
183 Returns:
184 FoldChange :
185 0 : when both input values are 0
186 "-INF" : when avg1 is 0
187 "INF" : when avg2 is 0
188 float : for any other combination of values
189 """
190 if avg1 == 0 and avg2 == 0:
191 return 0
192 elif avg1 == 0:
193 return '-INF'
194 elif avg2 == 0:
195 return 'INF'
196 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
197 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
198
199 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
200 """
201 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
202
203 Args:
204 l : current style string of an SVG element
205 col : new value for the "stroke" style property
206 width : new value for the "stroke-width" style property
207 dash : new value for the "stroke-dasharray" style property
208
209 Returns:
210 str : the fixed style string
211 """
212 tmp = l.split(';')
213 flag_col = False
214 flag_width = False
215 flag_dash = False
216 for i in range(len(tmp)):
217 if tmp[i].startswith('stroke:'):
218 tmp[i] = 'stroke:' + col
219 flag_col = True
220 if tmp[i].startswith('stroke-width:'):
221 tmp[i] = 'stroke-width:' + width
222 flag_width = True
223 if tmp[i].startswith('stroke-dasharray:'):
224 tmp[i] = 'stroke-dasharray:' + dash
225 flag_dash = True
226 if not flag_col:
227 tmp.append('stroke:' + col)
228 if not flag_width:
229 tmp.append('stroke-width:' + width)
230 if not flag_dash:
231 tmp.append('stroke-dasharray:' + dash)
232 return ';'.join(tmp)
233
234 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
235 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
236 """
237 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
238
239 Args:
240 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
241 core_map : SVG map to modify
242 threshold_P_V : threshold for a p-value to be considered significant
243 threshold_F_C : threshold for a fold change value to be considered significant
244 max_z_score : highest z-score (absolute value)
245
246 Returns:
247 ET.ElementTree : the modified core_map
248
249 Side effects:
250 core_map : mut
251 """
252 maxT = 12
253 minT = 2
254 grey = '#BEBEBE'
255 blue = '#6495ed'
256 red = '#ecac68'
257 for el in core_map.iter():
258 el_id = str(el.get('id'))
259 if el_id.startswith('R_'):
260 tmp = d.get(el_id[2:])
261 if tmp != None:
262 p_val :float = tmp[0]
263 f_c = tmp[1]
264 z_score = tmp[2]
265 if p_val < threshold_P_V:
266 if not isinstance(f_c, str):
267 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): #
268 col = grey
269 width = str(minT)
270 else:
271 if f_c < 0:
272 col = blue
273 elif f_c > 0:
274 col = red
275 width = str(max((abs(z_score) * maxT) / max_z_score, minT))
276 else:
277 if f_c == '-INF':
278 col = blue
279 elif f_c == 'INF':
280 col = red
281 width = str(maxT)
282 dash = 'none'
283 else:
284 dash = '5,5'
285 col = grey
286 width = str(minT)
287 el.set('style', fix_style(el.get('style', ""), col, width, dash))
288 return core_map
289
290 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
291 """
292 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
293 not enforced, if more than one element with the exact ID is found only the first will be returned.
294
295 Args:
296 reactionId (str): exact ID of the requested element.
297 metabMap (ET.ElementTree): metabolic map containing the element.
298
299 Returns:
300 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
301 """
302 return utils.Result.Ok(
303 f"//*[@id=\"{reactionId}\"]").map(
304 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
305 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
306 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
307
308 def styleMapElement(element :ET.Element, styleStr :str) -> None:
309 currentStyles :str = element.get("style", "")
310 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
311 currentStyles = ';'.join(currentStyles.split(';')[:-3])
312
313 element.set("style", currentStyles + styleStr)
314
315 class ReactionDirection(Enum):
316 Unknown = ""
317 Direct = "_F"
318 Inverse = "_B"
319
320 @classmethod
321 def fromDir(cls, s :str) -> "ReactionDirection":
322 # vvv as long as there's so few variants I actually condone the if spam:
323 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
324 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
325 return ReactionDirection.Unknown
326
327 @classmethod
328 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
329 return ReactionDirection.fromDir(reactionId[-2:])
330
331 def getArrowBodyElementId(reactionId :str) -> str:
332 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
333 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
334 return f"R_{reactionId}"
335
336 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
337 """
338 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
339
340 Args:
341 reactionId : the provided reaction ID.
342
343 Returns:
344 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
345 """
346 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
347 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
348 return f"F_{reactionId}", f"B_{reactionId}"
349
350 class ArrowColor(Enum):
351 """
352 Encodes possible arrow colors based on their meaning in the enrichment process.
353 """
354 Invalid = "#BEBEBE" # gray, fold-change under treshold
355 Transparent = "#ffffff00" # white, not significant p-value
356 UpRegulated = "#ecac68" # red, up-regulated reaction
357 DownRegulated = "#6495ed" # blue, down-regulated reaction
358
359 UpRegulatedInv = "#FF0000"
360 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
361 # conflicting enrichment in the two directions.
362
363 DownRegulatedInv = "#0000FF"
364 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
365 # conflicting enrichment in the two directions.
366
367 @classmethod
368 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
369 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
370 return colors[useAltColor]
371
372 def __str__(self) -> str: return self.value
373
374 class Arrow:
375 """
376 Models the properties of a reaction arrow that change based on enrichment.
377 """
378 MIN_W = 2
379 MAX_W = 12
380
381 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
382 """
383 (Private) Initializes an instance of Arrow.
384
385 Args:
386 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
387 col : color of the arrow.
388 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
389
390 Returns:
391 None : practically, a Arrow instance.
392 """
393 self.w = width
394 self.col = col
395 self.dash = isDashed
396
397 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
398 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
399 ERRORS.append(reactionId)
400
401 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
402 if not mindReactionDir:
403 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
404
405 # Now we style the arrow head(s):
406 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
407 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
408 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
409
410 def styleReactionElementsMeanMedian(self, metabMap :ET.ElementTree, reactionId :str, isNegative:bool) -> None:
411
412 self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
413 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
414
415 if(isNegative):
416 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
417 self.col = ArrowColor.Transparent
418 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
419 else:
420 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
421 self.col = ArrowColor.Transparent
422 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
423
424
425
426 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
427 """
428 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
429
430 Args:
431 reactionId: the reaction ID, as encoded in the dataset.
432 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
433
434 Returns:
435 str : the ID of an arrow's body or tips in the map.
436 """
437 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
438 if not mindReactionDir: return "R_" + reactionId
439
440 #TODO: this is clearly something we need to make consistent in fluxes
441 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
442
443 def toStyleStr(self, *, downSizedForTips = False) -> str:
444 """
445 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
446
447 Returns:
448 str : the styles string.
449 """
450 width = self.w
451 if downSizedForTips: width *= 0.8
452 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
453
454 # vvv These constants could be inside the class itself a static properties, but python
455 # was built by brainless organisms so here we are!
456 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
457 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
458
459 def applyFluxesEnrichmentToMap(fluxesEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
460 """
461 Applies fluxes enrichment results to the provided metabolic map.
462
463 Args:
464 fluxesEnrichmentRes : fluxes enrichment results.
465 metabMap : the metabolic map to edit.
466 maxNumericZScore : biggest finite z-score value found.
467
468 Side effects:
469 metabMap : mut
470
471 Returns:
472 None
473 """
474 for reactionId, values in fluxesEnrichmentRes.items():
475 pValue = values[0]
476 foldChange = values[1]
477 z_score = values[2]
478
479 if isinstance(foldChange, str): foldChange = float(foldChange)
480 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
481 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
482 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
483
484 continue
485
486 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
487 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
488 INVALID_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
489
490 continue
491
492 width = Arrow.MAX_W
493 if not math.isinf(foldChange):
494 try:
495 width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
496
497 except ZeroDivisionError: pass
498
499 #if not reactionId.endswith("_RV"): # RV stands for reversible reactions
500 # Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
501 # continue
502
503 #reactionId = reactionId[:-3] # Remove "_RV"
504
505 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
506 if inversionScore == 2: foldChange *= -1
507 # ^^^ Style the inverse direction with the opposite sign netValue
508
509 # If the score is 1 (opposite signs) we use alternative colors vvv
510 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
511
512 # vvv These 2 if statements can both be true and can both happen
513 if ARGS.net: # style arrow head(s):
514 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
515 arrow.applyTo(("F_" if inversionScore == 2 else "B_") + reactionId, metabMap, f";stroke:{ArrowColor.Transparent};stroke-width:0;stroke-dasharray:None")
516
517 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
518
519
520 ############################ split class ######################################
521 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
522 """
523 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
524
525 Args:
526 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
527 resolve_rules : a :dict containing :float data
528
529 Returns:
530 dict : the dict with data grouped by class
531
532 Side effects:
533 classes : mut
534 """
535 class_pat :Dict[str, List[List[float]]] = {}
536 for i in range(len(classes)):
537 classe :str = classes.iloc[i, 1]
538 if pd.isnull(classe): continue
539
540 l :List[List[float]] = []
541 for j in range(i, len(classes)):
542 if classes.iloc[j, 1] == classe:
543 pat_id :str = classes.iloc[j, 0]
544 tmp = resolve_rules.get(pat_id, None)
545 if tmp != None:
546 l.append(tmp)
547 classes.iloc[j, 1] = None
548
549 if l:
550 class_pat[classe] = list(map(list, zip(*l)))
551 continue
552
553 utils.logWarning(
554 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
555
556 return class_pat
557
558 ############################ conversion ##############################################
559 #conversion from svg to png
560 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
561 """
562 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
563
564 Args:
565 svg_path : path to SVG file
566 png_path : path for new PNG file
567 dpi : dots per inch of the generated PNG
568 scale : scaling factor for the generated PNG, computed internally when a size is provided
569 size : final effective width of the generated PNG
570
571 Returns:
572 None
573 """
574 if size:
575 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
576 scale = size / image.width
577 image = image.resize(scale)
578 else:
579 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
580
581 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
582 white_background = white_background.affine([scale, 0, 0, scale])
583
584 if white_background.bands != image.bands:
585 white_background = white_background.extract_band(0)
586
587 composite_image = white_background.composite2(image, 'over')
588 composite_image.write_to_file(png_path.show())
589
590 #funzione unica, lascio fuori i file e li passo in input
591 #conversion from png to pdf
592 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
593 """
594 Internal utility to convert a PNG to PDF to aid from SVG conversion.
595
596 Args:
597 png_file : path to PNG file
598 pdf_file : path to new PDF file
599
600 Returns:
601 None
602 """
603 image = Image.open(png_file.show())
604 image = image.convert("RGB")
605 image.save(pdf_file.show(), "PDF", resolution=100.0)
606
607 #function called to reduce redundancy in the code
608 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
609 """
610 Converts the SVG map at the provided path to PDF.
611
612 Args:
613 file_svg : path to SVG file
614 file_png : path to PNG file
615 file_pdf : path to new PDF file
616
617 Returns:
618 None
619 """
620 svg_to_png_with_background(file_svg, file_png)
621 try:
622 convert_png_to_pdf(file_png, file_pdf)
623 print(f'PDF file {file_pdf.filePath} successfully generated.')
624
625 except Exception as e:
626 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
627
628 ############################ map ##############################################
629 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
630 """
631 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
632 "result/" folder, used by this tool for output files in collections.
633
634 Args:
635 dataset1Name : _description_
636 dataset2Name : _description_. Defaults to "rest".
637 details : _description_
638 ext : _description_
639
640 Returns:
641 utils.FilePath : _description_
642 """
643 # This function returns a util data structure but is extremely specific to this module.
644 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
645 # TODO: until a third tool with multiple outputs appears before porting this to utils.
646 return utils.FilePath(
647 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
648 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
649 # all output files: I don't care, this was never the performance bottleneck of the tool and
650 # there is no other net gain in saving and re-using the built string.
651 ext,
652 prefix = ARGS.output_path)
653
654 FIELD_NOT_AVAILABLE = '/'
655 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
656 fieldsAmt = len(fieldNames)
657 with open(outPath.show(), "w", newline = "") as fd:
658 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
659 writer.writeheader()
660
661 for row in rows:
662 sizeMismatch = fieldsAmt - len(row)
663 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
664 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
665
666 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
667 def writeTabularResult(enrichedScores : OldEnrichedScores, outPath :utils.FilePath) -> None:
668 fieldNames = ["ids", "P_Value", "fold change"]
669 fieldNames.extend(["average_1", "average_2"])
670
671 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
672
673 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest") -> None:
674 # this function compiles the things always in common between comparison modes after enrichment.
675 # TODO: organize, name better.
676 writeTabularResult(tmp, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
677 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
678 applyFluxesEnrichmentToMap(tmp, core_map, max_z_score)
679
680 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
681 """
682 Computes the statistical significance score (P-value) of the comparison between coherent data
683 from two datasets. The data is supposed to, in both datasets:
684 - be related to the same reaction ID;
685 - be ordered by sample, such that the item at position i in both lists is related to the
686 same sample or cell line.
687
688 Args:
689 dataset1Data : data from the 1st dataset.
690 dataset2Data : data from the 2nd dataset.
691
692 Returns:
693 tuple: (P-value, Z-score)
694 - P-value from a Kolmogorov-Smirnov test on the provided data.
695 - Z-score of the difference between means of the two datasets.
696 """
697 # Perform Kolmogorov-Smirnov test
698 ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
699
700 # Calculate means and standard deviations
701 mean1 = np.mean(dataset1Data)
702 mean2 = np.mean(dataset2Data)
703 std1 = np.std(dataset1Data, ddof=1)
704 std2 = np.std(dataset2Data, ddof=1)
705
706 n1 = len(dataset1Data)
707 n2 = len(dataset2Data)
708
709 # Calculate Z-score
710 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
711
712 return p_value, z_score
713
714 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
715 #TODO: the following code still suffers from "dumbvarnames-osis"
716 tmp :Dict[str, List[Union[float, FoldChange]]] = {}
717 count = 0
718 max_z_score = 0
719
720 for l1, l2 in zip(dataset1Data, dataset2Data):
721 reactId = ids[count]
722 count += 1
723 if not reactId: continue # we skip ids that have already been processed
724
725 try:
726 p_value, z_score = computePValue(l1, l2)
727 avg1 = sum(l1) / len(l1)
728 avg2 = sum(l2) / len(l2)
729 avg = fold_change(avg1, avg2)
730 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
731 tmp[reactId] = [float(p_value), avg, z_score, avg1, avg2]
732 except (TypeError, ZeroDivisionError): continue
733
734 return tmp, max_z_score
735
736 def computeEnrichment(class_pat :Dict[str, List[List[float]]], ids :List[str]) -> List[Tuple[str, str, dict, float]]:
737 """
738 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
739 provided metabolic map.
740
741 Args:
742 class_pat : the clustered data.
743 ids : ids for data association.
744
745
746 Returns:
747 List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
748
749 Raises:
750 sys.exit : if there are less than 2 classes for comparison
751
752 """
753 class_pat = { k.strip() : v for k, v in class_pat.items() }
754 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
755 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
756
757 enrichment_results = []
758
759
760 if ARGS.comparison == "manyvsmany":
761 for i, j in it.combinations(class_pat.keys(), 2):
762 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
763 enrichment_results.append((i, j, comparisonDict, max_z_score))
764
765 elif ARGS.comparison == "onevsrest":
766 for single_cluster in class_pat.keys():
767 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
768 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
769 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
770
771 elif ARGS.comparison == "onevsmany":
772 controlItems = class_pat.get(ARGS.control)
773 for otherDataset in class_pat.keys():
774 if otherDataset == ARGS.control:
775 continue
776 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
777 enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
778 return enrichment_results
779
780 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
781 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
782 utils.writeSvg(svgFilePath, core_map)
783
784 if ARGS.generate_pdf:
785 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
786 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
787 convert_to_pdf(svgFilePath, pngPath, pdfPath)
788
789 if not ARGS.generate_svg:
790 os.remove(svgFilePath.show())
791
792 ClassPat = Dict[str, List[List[float]]]
793 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
794 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
795 # for the sake of everyone's sanity.
796 class_pat :ClassPat = {}
797 if ARGS.option == 'datasets':
798 num = 1 #TODO: the dataset naming function could be a generator
799 for path, name in zip(datasetsPaths, names):
800 name = name_dataset(name, num)
801 resolve_rules_float, ids = getDatasetValues(path, name)
802 if resolve_rules_float != None:
803 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
804
805 num += 1
806
807 elif ARGS.option == "dataset_class":
808 classes = read_dataset(classPath, "class")
809 classes = classes.astype(str)
810
811 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
812 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
813
814 return ids, class_pat
815 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
816
817 #TODO: create these damn args as FilePath objects
818 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
819 """
820 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
821
822 Args:
823 datasetPath : path to the dataset
824 datasetName (str): dataset name, used in error reporting
825
826 Returns:
827 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
828 """
829 dataset = read_dataset(datasetPath, datasetName)
830 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
831
832 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
833 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
834
835 def rgb_to_hex(rgb):
836 """
837 Convert RGB values (0-1 range) to hexadecimal color format.
838
839 Args:
840 rgb (numpy.ndarray): An array of RGB color components (in the range [0, 1]).
841
842 Returns:
843 str: The color in hexadecimal format (e.g., '#ff0000' for red).
844 """
845 # Convert RGB values (0-1 range) to hexadecimal format
846 rgb = (np.array(rgb) * 255).astype(int)
847 return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
848
849
850
851 def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"):
852 """
853 Create and save an image of the colormap showing the gradient and its range.
854
855 Args:
856 min_value (float): The minimum value of the colormap range.
857 max_value (float): The maximum value of the colormap range.
858 filename (str): The filename for saving the image.
859 """
860
861 # Create a colormap using matplotlib
862 cmap = plt.get_cmap(colorMap)
863
864 # Create a figure and axis
865 fig, ax = plt.subplots(figsize=(6, 1))
866 fig.subplots_adjust(bottom=0.5)
867
868 # Create a gradient image
869 gradient = np.linspace(0, 1, 256)
870 gradient = np.vstack((gradient, gradient))
871
872 # Add min and max value annotations
873 ax.text(0, 0.5, f'{np.round(min_value, 3)}', va='center', ha='right', transform=ax.transAxes, fontsize=12, color='black')
874 ax.text(1, 0.5, f'{np.round(max_value, 3)}', va='center', ha='left', transform=ax.transAxes, fontsize=12, color='black')
875
876
877 # Display the gradient image
878 ax.imshow(gradient, aspect='auto', cmap=cmap)
879 ax.set_axis_off()
880
881 # Save the image
882 plt.savefig(path.show(), bbox_inches='tight', pad_inches=0)
883 plt.close()
884 pass
885
886 def min_nonzero_abs(arr):
887 # Flatten the array and filter out zeros, then find the minimum of the remaining values
888 non_zero_elements = np.abs(arr)[np.abs(arr) > 0]
889 return np.min(non_zero_elements) if non_zero_elements.size > 0 else None
890
891 def computeEnrichmentMeanMedian(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], colormap:str) -> None:
892 """
893 Compute and visualize the metabolic map based on mean and median of the input fluxes.
894 The fluxes are normalised across classes/datasets and visualised using the given colormap.
895
896 Args:
897 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
898 class_pat (Dict[str, List[List[float]]]): A dictionary where keys are class names and values are lists of enrichment values.
899 ids (List[str]): A list of reaction IDs to be used for coloring arrows.
900
901 Returns:
902 None
903 """
904 # Create copies only if they are needed
905 metabMap_mean = copy.deepcopy(metabMap)
906 metabMap_median = copy.deepcopy(metabMap)
907
908 # Compute medians and means
909 medians = {key: np.round(np.median(np.array(value), axis=1), 6) for key, value in class_pat.items()}
910 means = {key: np.round(np.mean(np.array(value), axis=1),6) for key, value in class_pat.items()}
911
912 # Normalize medians and means
913 max_flux_medians = max(np.max(np.abs(arr)) for arr in medians.values())
914 max_flux_means = max(np.max(np.abs(arr)) for arr in means.values())
915
916 min_flux_medians = min(min_nonzero_abs(arr) for arr in medians.values())
917 min_flux_means = min(min_nonzero_abs(arr) for arr in means.values())
918
919 medians = {key: median/max_flux_medians for key, median in medians.items()}
920 means = {key: mean/max_flux_means for key, mean in means.items()}
921
922 save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
923 save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
924
925 cmap = plt.get_cmap(colormap)
926
927 for key in class_pat:
928 # Create color mappings for median and mean
929 colors_median = {
930 rxn_id: rgb_to_hex(cmap(abs(medians[key][i]))) if medians[key][i] != 0 else '#bebebe' #grey blocked
931 for i, rxn_id in enumerate(ids)
932 }
933
934 colors_mean = {
935 rxn_id: rgb_to_hex(cmap(abs(means[key][i]))) if means[key][i] != 0 else '#bebebe' #grey blocked
936 for i, rxn_id in enumerate(ids)
937 }
938
939 for i, rxn_id in enumerate(ids):
940 isNegative = medians[key][i] < 0
941
942 # Apply median arrows
943 apply_arrow(metabMap_median, rxn_id, colors_median[rxn_id], isNegative)
944
945 isNegative = means[key][i] < 0
946 # Apply mean arrows
947 apply_arrow(metabMap_mean, rxn_id, colors_mean[rxn_id], isNegative)
948
949 # Save and convert the SVG files
950 save_and_convert(metabMap_mean, "mean", key)
951 save_and_convert(metabMap_median, "median", key)
952
953 def apply_arrow(metabMap, rxn_id, color, isNegative):
954 """
955 Apply an arrow to a specific reaction in the metabolic map with a given color.
956
957 Args:
958 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
959 rxn_id (str): The ID of the reaction to which the arrow will be applied.
960 color (str): The color of the arrow in hexadecimal format.
961
962 Returns:
963 None
964 """
965 arrow = Arrow(width=5, col=color)
966 arrow.styleReactionElementsMeanMedian(metabMap, rxn_id, isNegative)
967 pass
968
969 def save_and_convert(metabMap, map_type, key):
970 """
971 Save the metabolic map as an SVG file and optionally convert it to PNG and PDF formats.
972
973 Args:
974 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
975 map_type (str): The type of map ('mean' or 'median').
976 key (str): The key identifying the specific map.
977
978 Returns:
979 None
980 """
981 svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix=ARGS.output_path)
982 utils.writeSvg(svgFilePath, metabMap)
983 if ARGS.generate_pdf:
984 pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix=ARGS.output_path)
985 pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix=ARGS.output_path)
986 convert_to_pdf(svgFilePath, pngPath, pdfPath)
987 if not ARGS.generate_svg:
988 os.remove(svgFilePath.show())
989
990
991 ############################ MAIN #############################################
992 def main(args:List[str] = None) -> None:
993 """
994 Initializes everything and sets the program in motion based on the fronted input arguments.
995
996 Returns:
997 None
998
999 Raises:
1000 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
1001 """
1002
1003 global ARGS
1004 ARGS = process_args(args)
1005
1006 if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
1007
1008 core_map :ET.ElementTree = ARGS.choice_map.getMap(
1009 ARGS.tool_dir,
1010 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
1011 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
1012 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
1013 # solution can be derived from my comment in FilePath.fromStrPath
1014
1015 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_fluxes, ARGS.input_data_fluxes, ARGS.input_class_fluxes, ARGS.names_fluxes)
1016
1017 if(ARGS.choice_map == utils.Model.HMRcore):
1018 temp_map = utils.Model.HMRcore_no_legend
1019 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1020 elif(ARGS.choice_map == utils.Model.ENGRO2):
1021 temp_map = utils.Model.ENGRO2_no_legend
1022 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1023 else:
1024 computeEnrichmentMeanMedian(core_map, class_pat, ids, ARGS.color_map)
1025
1026
1027 enrichment_results = computeEnrichment(class_pat, ids)
1028 for i, j, comparisonDict, max_z_score in enrichment_results:
1029 map_copy = copy.deepcopy(core_map)
1030 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j)
1031 createOutputMaps(i, j, map_copy)
1032
1033 if not ERRORS: return
1034 utils.logWarning(
1035 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
1036 ARGS.out_log)
1037
1038 print('Execution succeded')
1039
1040 ###############################################################################
1041 if __name__ == "__main__":
1042 main()
1043