comparison COBRAxy/marea.py @ 93:7e703e546998 draft

Uploaded
author luca_milaz
date Sun, 13 Oct 2024 11:41:34 +0000
parents 41f35c2f0c7b
children 507efdc9d226
comparison
equal deleted inserted replaced
92:fdf713bb5772 93:7e703e546998
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import utils.general_utils as utils
13 from PIL import Image
14 import os
15 import argparse
16 import pyvips
17 from typing import Tuple, Union, Optional, List, Dict
18
19 ERRORS = []
20 ########################## argparse ##########################################
21 ARGS :argparse.Namespace
22 def process_args() -> argparse.Namespace:
23 """
24 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
25
26 Args:
27 args : Always obtained (in file) from sys.argv
28
29 Returns:
30 Namespace : An object containing the parsed arguments
31 """
32 parser = argparse.ArgumentParser(
33 usage = "%(prog)s [options]",
34 description = "process some value's genes to create a comparison's map.")
35
36 #General:
37 parser.add_argument(
38 '-td', '--tool_dir',
39 type = str,
40 required = True,
41 help = 'your tool directory')
42
43 parser.add_argument('-on', '--control', type = str)
44 parser.add_argument('-ol', '--out_log', help = "Output log")
45
46 #Computation details:
47 parser.add_argument(
48 '-co', '--comparison',
49 type = str,
50 default = '1vs1',
51 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
52
53 parser.add_argument(
54 '-pv' ,'--pValue',
55 type = float,
56 default = 0.1,
57 help = 'P-Value threshold (default: %(default)s)')
58
59 parser.add_argument(
60 '-fc', '--fChange',
61 type = float,
62 default = 1.5,
63 help = 'Fold-Change threshold (default: %(default)s)')
64
65 parser.add_argument(
66 "-ne", "--net",
67 type = utils.Bool("net"), default = False,
68 help = "choose if you want net enrichment for RPS")
69
70 parser.add_argument(
71 '-op', '--option',
72 type = str,
73 choices = ['datasets', 'dataset_class'],
74 help='dataset or dataset and class')
75
76 #RAS:
77 parser.add_argument(
78 "-ra", "--using_RAS",
79 type = utils.Bool("using_RAS"), default = True,
80 help = "choose whether to use RAS datasets.")
81
82 parser.add_argument(
83 '-id', '--input_data',
84 type = str,
85 help = 'input dataset')
86
87 parser.add_argument(
88 '-ic', '--input_class',
89 type = str,
90 help = 'sample group specification')
91
92 parser.add_argument(
93 '-ids', '--input_datas',
94 type = str,
95 nargs = '+',
96 help = 'input datasets')
97
98 parser.add_argument(
99 '-na', '--names',
100 type = str,
101 nargs = '+',
102 help = 'input names')
103
104 #RPS:
105 parser.add_argument(
106 "-rp", "--using_RPS",
107 type = utils.Bool("using_RPS"), default = False,
108 help = "choose whether to use RPS datasets.")
109
110 parser.add_argument(
111 '-idr', '--input_data_rps',
112 type = str,
113 help = 'input dataset rps')
114
115 parser.add_argument(
116 '-icr', '--input_class_rps',
117 type = str,
118 help = 'sample group specification rps')
119
120 parser.add_argument(
121 '-idsr', '--input_datas_rps',
122 type = str,
123 nargs = '+',
124 help = 'input datasets rps')
125
126 parser.add_argument(
127 '-nar', '--names_rps',
128 type = str,
129 nargs = '+',
130 help = 'input names rps')
131
132 #Output:
133 parser.add_argument(
134 "-gs", "--generate_svg",
135 type = utils.Bool("generate_svg"), default = True,
136 help = "choose whether to use RAS datasets.")
137
138 parser.add_argument(
139 "-gp", "--generate_pdf",
140 type = utils.Bool("generate_pdf"), default = True,
141 help = "choose whether to use RAS datasets.")
142
143 parser.add_argument(
144 '-cm', '--custom_map',
145 type = str,
146 help='custom map to use')
147
148 parser.add_argument(
149 '-mc', '--choice_map',
150 type = utils.Model, default = utils.Model.HMRcore,
151 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
152
153 args :argparse.Namespace = parser.parse_args()
154 if args.using_RAS and not args.using_RPS: args.net = False
155
156 return args
157
158 ############################ dataset input ####################################
159 def read_dataset(data :str, name :str) -> pd.DataFrame:
160 """
161 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
162
163 Args:
164 data : filepath of a dataset (from frontend input params or literals upon calling)
165 name : name associated with the dataset (from frontend input params or literals upon calling)
166
167 Returns:
168 pd.DataFrame : dataset in a runtime operable shape
169
170 Raises:
171 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
172 """
173 try:
174 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
175 except pd.errors.EmptyDataError:
176 sys.exit('Execution aborted: wrong format of ' + name + '\n')
177 if len(dataset.columns) < 2:
178 sys.exit('Execution aborted: wrong format of ' + name + '\n')
179 return dataset
180
181 ############################ dataset name #####################################
182 def name_dataset(name_data :str, count :int) -> str:
183 """
184 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
185
186 Args:
187 name_data : name associated with the dataset (from frontend input params)
188 count : counter from 1 to make these names unique (external)
189
190 Returns:
191 str : the name made unique
192 """
193 if str(name_data) == 'Dataset':
194 return str(name_data) + '_' + str(count)
195 else:
196 return str(name_data)
197
198 ############################ map_methods ######################################
199 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
200 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
201 """
202 Calculates the fold change between two gene expression values.
203
204 Args:
205 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
206
207 Returns:
208 FoldChange :
209 0 : when both input values are 0
210 "-INF" : when avg1 is 0
211 "INF" : when avg2 is 0
212 float : for any other combination of values
213 """
214 if avg1 == 0 and avg2 == 0:
215 return 0
216 elif avg1 == 0:
217 return '-INF'
218 elif avg2 == 0:
219 return 'INF'
220 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
221 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
222
223 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
224 """
225 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
226
227 Args:
228 l : current style string of an SVG element
229 col : new value for the "stroke" style property
230 width : new value for the "stroke-width" style property
231 dash : new value for the "stroke-dasharray" style property
232
233 Returns:
234 str : the fixed style string
235 """
236 tmp = l.split(';')
237 flag_col = False
238 flag_width = False
239 flag_dash = False
240 for i in range(len(tmp)):
241 if tmp[i].startswith('stroke:'):
242 tmp[i] = 'stroke:' + col
243 flag_col = True
244 if tmp[i].startswith('stroke-width:'):
245 tmp[i] = 'stroke-width:' + width
246 flag_width = True
247 if tmp[i].startswith('stroke-dasharray:'):
248 tmp[i] = 'stroke-dasharray:' + dash
249 flag_dash = True
250 if not flag_col:
251 tmp.append('stroke:' + col)
252 if not flag_width:
253 tmp.append('stroke-width:' + width)
254 if not flag_dash:
255 tmp.append('stroke-dasharray:' + dash)
256 return ';'.join(tmp)
257
258 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
259 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
260 """
261 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
262
263 Args:
264 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
265 core_map : SVG map to modify
266 threshold_P_V : threshold for a p-value to be considered significant
267 threshold_F_C : threshold for a fold change value to be considered significant
268 max_z_score : highest z-score (absolute value)
269
270 Returns:
271 ET.ElementTree : the modified core_map
272
273 Side effects:
274 core_map : mut
275 """
276 maxT = 12
277 minT = 2
278 grey = '#BEBEBE'
279 blue = '#6495ed'
280 red = '#ecac68'
281 for el in core_map.iter():
282 el_id = str(el.get('id'))
283 if el_id.startswith('R_'):
284 tmp = d.get(el_id[2:])
285 if tmp != None:
286 p_val :float = tmp[0]
287 f_c = tmp[1]
288 z_score = tmp[2]
289 if p_val < threshold_P_V:
290 if not isinstance(f_c, str):
291 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): #
292 col = grey
293 width = str(minT)
294 else:
295 if f_c < 0:
296 col = blue
297 elif f_c > 0:
298 col = red
299 width = str(max((abs(z_score) * maxT) / max_z_score, minT))
300 else:
301 if f_c == '-INF':
302 col = blue
303 elif f_c == 'INF':
304 col = red
305 width = str(maxT)
306 dash = 'none'
307 else:
308 dash = '5,5'
309 col = grey
310 width = str(minT)
311 el.set('style', fix_style(el.get('style', ""), col, width, dash))
312 return core_map
313
314 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
315 """
316 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
317 not enforced, if more than one element with the exact ID is found only the first will be returned.
318
319 Args:
320 reactionId (str): exact ID of the requested element.
321 metabMap (ET.ElementTree): metabolic map containing the element.
322
323 Returns:
324 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
325 """
326 return utils.Result.Ok(
327 f"//*[@id=\"{reactionId}\"]").map(
328 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
329 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
330 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
331
332 def styleMapElement(element :ET.Element, styleStr :str) -> None:
333 currentStyles :str = element.get("style", "")
334 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
335 currentStyles = ';'.join(currentStyles.split(';')[:-3])
336
337 element.set("style", currentStyles + styleStr)
338
339 class ReactionDirection(Enum):
340 Unknown = ""
341 Direct = "_F"
342 Inverse = "_B"
343
344 @classmethod
345 def fromDir(cls, s :str) -> "ReactionDirection":
346 # vvv as long as there's so few variants I actually condone the if spam:
347 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
348 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
349 return ReactionDirection.Unknown
350
351 @classmethod
352 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
353 return ReactionDirection.fromDir(reactionId[-2:])
354
355 def getArrowBodyElementId(reactionId :str) -> str:
356 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
357 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
358 return f"R_{reactionId}"
359
360 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
361 """
362 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
363
364 Args:
365 reactionId : the provided reaction ID.
366
367 Returns:
368 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
369 """
370 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
371 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
372 return f"F_{reactionId}", f"B_{reactionId}"
373
374 class ArrowColor(Enum):
375 """
376 Encodes possible arrow colors based on their meaning in the enrichment process.
377 """
378 Invalid = "#BEBEBE" # gray, fold-change under treshold
379 UpRegulated = "#ecac68" # red, up-regulated reaction
380 DownRegulated = "#6495ed" # blue, down-regulated reaction
381
382 UpRegulatedInv = "#FF0000"
383 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
384 # conflicting enrichment in the two directions.
385
386 DownRegulatedInv = "#0000FF"
387 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
388 # conflicting enrichment in the two directions.
389
390 @classmethod
391 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
392 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
393 return colors[useAltColor]
394
395 def __str__(self) -> str: return self.value
396
397 class Arrow:
398 """
399 Models the properties of a reaction arrow that change based on enrichment.
400 """
401 MIN_W = 2
402 MAX_W = 12
403
404 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
405 """
406 (Private) Initializes an instance of Arrow.
407
408 Args:
409 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
410 col : color of the arrow.
411 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
412
413 Returns:
414 None : practically, a Arrow instance.
415 """
416 self.w = width
417 self.col = col
418 self.dash = isDashed
419
420 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
421 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
422 ERRORS.append(reactionId)
423
424 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
425 # If We're dealing with RAS data or in general don't care about the direction of the reaction we only style the arrow body
426 if not mindReactionDir:
427 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
428
429 # Now we style the arrow head(s):
430 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
431 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
432 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
433
434 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
435 """
436 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
437
438 Args:
439 reactionId: the reaction ID, as encoded in the dataset.
440 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
441
442 Returns:
443 str : the ID of an arrow's body or tips in the map.
444 """
445 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
446 if not mindReactionDir: return "R_" + reactionId
447
448 #TODO: this is clearly something we need to make consistent in RPS
449 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
450
451 def toStyleStr(self, *, downSizedForTips = False) -> str:
452 """
453 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
454
455 Returns:
456 str : the styles string.
457 """
458 width = self.w
459 if downSizedForTips: width *= 0.8
460 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
461
462 # vvv These constants could be inside the class itself a static properties, but python
463 # was built by brainless organisms so here we are!
464 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
465 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
466
467 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
468 """
469 Applies RPS enrichment results to the provided metabolic map.
470
471 Args:
472 rpsEnrichmentRes : RPS enrichment results.
473 metabMap : the metabolic map to edit.
474 maxNumericZScore : biggest finite z-score value found.
475
476 Side effects:
477 metabMap : mut
478
479 Returns:
480 None
481 """
482 for reactionId, values in rpsEnrichmentRes.items():
483 pValue = values[0]
484 foldChange = values[1]
485 z_score = values[2]
486
487 if isinstance(foldChange, str): foldChange = float(foldChange)
488 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
489 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
490 continue
491
492 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
493 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
494 continue
495
496 width = Arrow.MAX_W
497 if not math.isinf(foldChange):
498 try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
499 except ZeroDivisionError: pass
500
501 if not reactionId.endswith("_RV"): # RV stands for reversible reactions
502 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
503 continue
504
505 reactionId = reactionId[:-3] # Remove "_RV"
506
507 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
508 if inversionScore == 2: foldChange *= -1
509 # ^^^ Style the inverse direction with the opposite sign netValue
510
511 # If the score is 1 (opposite signs) we use alternative colors vvv
512 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
513
514 # vvv These 2 if statements can both be true and can both happen
515 if ARGS.net: # style arrow head(s):
516 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
517
518 if not ARGS.using_RAS: # style arrow body
519 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
520
521 ############################ split class ######################################
522 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
523 """
524 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
525
526 Args:
527 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
528 resolve_rules : a :dict containing :float data
529
530 Returns:
531 dict : the dict with data grouped by class
532
533 Side effects:
534 classes : mut
535 """
536 class_pat :Dict[str, List[List[float]]] = {}
537 for i in range(len(classes)):
538 classe :str = classes.iloc[i, 1]
539 if pd.isnull(classe): continue
540
541 l :List[List[float]] = []
542 for j in range(i, len(classes)):
543 if classes.iloc[j, 1] == classe:
544 pat_id :str = classes.iloc[j, 0]
545 tmp = resolve_rules.get(pat_id, None)
546 if tmp != None:
547 l.append(tmp)
548 classes.iloc[j, 1] = None
549
550 if l:
551 class_pat[classe] = list(map(list, zip(*l)))
552 continue
553
554 utils.logWarning(
555 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
556
557 return class_pat
558
559 ############################ conversion ##############################################
560 #conversion from svg to png
561 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
562 """
563 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
564
565 Args:
566 svg_path : path to SVG file
567 png_path : path for new PNG file
568 dpi : dots per inch of the generated PNG
569 scale : scaling factor for the generated PNG, computed internally when a size is provided
570 size : final effective width of the generated PNG
571
572 Returns:
573 None
574 """
575 if size:
576 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
577 scale = size / image.width
578 image = image.resize(scale)
579 else:
580 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
581
582 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
583 white_background = white_background.affine([scale, 0, 0, scale])
584
585 if white_background.bands != image.bands:
586 white_background = white_background.extract_band(0)
587
588 composite_image = white_background.composite2(image, 'over')
589 composite_image.write_to_file(png_path.show())
590
591 #funzione unica, lascio fuori i file e li passo in input
592 #conversion from png to pdf
593 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
594 """
595 Internal utility to convert a PNG to PDF to aid from SVG conversion.
596
597 Args:
598 png_file : path to PNG file
599 pdf_file : path to new PDF file
600
601 Returns:
602 None
603 """
604 image = Image.open(png_file.show())
605 image = image.convert("RGB")
606 image.save(pdf_file.show(), "PDF", resolution=100.0)
607
608 #function called to reduce redundancy in the code
609 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
610 """
611 Converts the SVG map at the provided path to PDF.
612
613 Args:
614 file_svg : path to SVG file
615 file_png : path to PNG file
616 file_pdf : path to new PDF file
617
618 Returns:
619 None
620 """
621 svg_to_png_with_background(file_svg, file_png)
622 try:
623 convert_png_to_pdf(file_png, file_pdf)
624 print(f'PDF file {file_pdf.filePath} successfully generated.')
625
626 except Exception as e:
627 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
628
629 ############################ map ##############################################
630 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
631 """
632 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
633 "result/" folder, used by this tool for output files in collections.
634
635 Args:
636 dataset1Name : _description_
637 dataset2Name : _description_. Defaults to "rest".
638 details : _description_
639 ext : _description_
640
641 Returns:
642 utils.FilePath : _description_
643 """
644 # This function returns a util data structure but is extremely specific to this module.
645 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
646 # TODO: until a third tool with multiple outputs appears before porting this to utils.
647 return utils.FilePath(
648 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
649 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
650 # all output files: I don't care, this was never the performance bottleneck of the tool and
651 # there is no other net gain in saving and re-using the built string.
652 ext,
653 prefix = "result")
654
655 FIELD_NOT_AVAILABLE = '/'
656 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
657 fieldsAmt = len(fieldNames)
658 with open(outPath.show(), "w", newline = "") as fd:
659 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
660 writer.writeheader()
661
662 for row in rows:
663 sizeMismatch = fieldsAmt - len(row)
664 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
665 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
666
667 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
668 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None:
669 fieldNames = ["ids", "P_Value", "fold change"]
670 if not ras_enrichment: fieldNames.extend(["average_1", "average_2"])
671
672 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
673
674 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
675 # this function compiles the things always in common between comparison modes after enrichment.
676 # TODO: organize, name better.
677 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
678
679 if ras_enrichment:
680 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score)
681 return
682
683 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
684 applyRpsEnrichmentToMap(tmp, core_map, max_z_score)
685
686 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
687 """
688 Computes the statistical significance score (P-value) of the comparison between coherent data
689 from two datasets. The data is supposed to, in both datasets:
690 - be related to the same reaction ID;
691 - be ordered by sample, such that the item at position i in both lists is related to the
692 same sample or cell line.
693
694 Args:
695 dataset1Data : data from the 1st dataset.
696 dataset2Data : data from the 2nd dataset.
697
698 Returns:
699 tuple: (P-value, Z-score)
700 - P-value from a Kolmogorov-Smirnov test on the provided data.
701 - Z-score of the difference between means of the two datasets.
702 """
703 # Perform Kolmogorov-Smirnov test
704 ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
705
706 # Calculate means and standard deviations
707 mean1 = np.mean(dataset1Data)
708 mean2 = np.mean(dataset2Data)
709 std1 = np.std(dataset1Data, ddof=1)
710 std2 = np.std(dataset2Data, ddof=1)
711
712 n1 = len(dataset1Data)
713 n2 = len(dataset2Data)
714
715 # Calculate Z-score
716 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
717
718 return p_value, z_score
719
720 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
721 #TODO: the following code still suffers from "dumbvarnames-osis"
722 tmp :Dict[str, List[Union[float, FoldChange]]] = {}
723 count = 0
724 max_z_score = 0
725
726 for l1, l2 in zip(dataset1Data, dataset2Data):
727 reactId = ids[count]
728 count += 1
729 if not reactId: continue # we skip ids that have already been processed
730
731 try: #TODO: identify the source of these errors and minimize code in the try block
732 reactDir = ReactionDirection.fromReactionId(reactId)
733 # Net score is computed only for reversible reactions when user wants it on arrow tips or when RAS datasets aren't used
734 if (ARGS.net or not ARGS.using_RAS) and reactDir is not ReactionDirection.Unknown:
735 try: position = ids.index(reactId[:-1] + ('B' if reactDir is ReactionDirection.Direct else 'F'))
736 except ValueError: continue # we look for the complementary id, if not found we skip
737
738 nets1 = np.subtract(l1, dataset1Data[position])
739 nets2 = np.subtract(l2, dataset2Data[position])
740
741 p_value, z_score = computePValue(nets1, nets2)
742 avg1 = sum(nets1) / len(nets1)
743 avg2 = sum(nets2) / len(nets2)
744 net = fold_change(avg1, avg2)
745
746 if math.isnan(net): continue
747 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
748
749 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
750 if ARGS.net:
751 ids[position] = None
752 continue
753
754 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
755 p_value, z_score = computePValue(l1, l2)
756 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
757 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
758 tmp[reactId] = [float(p_value), avg, z_score]
759
760 except (TypeError, ZeroDivisionError): continue
761
762 return tmp, max_z_score
763
764 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None:
765 """
766 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
767 provided metabolic map.
768
769 Args:
770 metabMap : SVG map to modify.
771 class_pat : the clustered data.
772 ids : ids for data association.
773 fromRAS : whether the data to enrich consists of RAS scores.
774
775 Returns:
776 None
777
778 Raises:
779 sys.exit : if there are less than 2 classes for comparison
780
781 Side effects:
782 metabMap : mut
783 ids : mut
784 """
785 class_pat = { k.strip() : v for k, v in class_pat.items() }
786 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
787 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
788
789 if ARGS.comparison == "manyvsmany":
790 for i, j in it.combinations(class_pat.keys(), 2):
791 #TODO: these 2 functions are always called in pair and in this order and need common data,
792 # some clever refactoring would be appreciated.
793 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
794 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS)
795
796 elif ARGS.comparison == "onevsrest":
797 for single_cluster in class_pat.keys():
798 t :List[List[List[float]]] = []
799 for k in class_pat.keys():
800 if k != single_cluster:
801 t.append(class_pat.get(k))
802
803 rest :List[List[float]] = []
804 for i in t:
805 rest = rest + i
806
807 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
808 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS)
809
810 elif ARGS.comparison == "onevsmany":
811 controlItems = class_pat.get(ARGS.control)
812 for otherDataset in class_pat.keys():
813 if otherDataset == ARGS.control: continue
814
815 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
816 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS)
817
818 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
819 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)
820 utils.writeSvg(svgFilePath, core_map)
821
822 if ARGS.generate_pdf:
823 pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG)
824 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF)
825 convert_to_pdf(svgFilePath, pngPath, pdfPath)
826
827 if not ARGS.generate_svg: os.remove(svgFilePath.show())
828
829 ClassPat = Dict[str, List[List[float]]]
830 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
831 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
832 # for the sake of everyone's sanity.
833 class_pat :ClassPat = {}
834 if ARGS.option == 'datasets':
835 num = 1 #TODO: the dataset naming function could be a generator
836 for path, name in zip(datasetsPaths, names):
837 name = name_dataset(name, num)
838 resolve_rules_float, ids = getDatasetValues(path, name)
839 if resolve_rules_float != None:
840 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
841
842 num += 1
843
844 elif ARGS.option == "dataset_class":
845 classes = read_dataset(classPath, "class")
846 classes = classes.astype(str)
847
848 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
849 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
850
851 return ids, class_pat
852 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
853
854 #TODO: create these damn args as FilePath objects
855 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
856 """
857 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
858
859 Args:
860 datasetPath : path to the dataset
861 datasetName (str): dataset name, used in error reporting
862
863 Returns:
864 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
865 """
866 dataset = read_dataset(datasetPath, datasetName)
867 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
868
869 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
870 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
871
872 ############################ MAIN #############################################
873 def main() -> None:
874 """
875 Initializes everything and sets the program in motion based on the fronted input arguments.
876
877 Returns:
878 None
879
880 Raises:
881 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
882 """
883
884 global ARGS
885 ARGS = process_args()
886
887 if os.path.isdir('result') == False: os.makedirs('result')
888
889 core_map :ET.ElementTree = ARGS.choice_map.getMap(
890 ARGS.tool_dir,
891 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
892 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
893 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
894 # solution can be derived from my comment in FilePath.fromStrPath
895
896 if ARGS.using_RAS:
897 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
898 computeEnrichment(core_map, class_pat, ids)
899
900 if ARGS.using_RPS:
901 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
902 computeEnrichment(core_map, class_pat, ids, fromRAS = False)
903
904 # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this
905 if ARGS.comparison == "manyvsmany":
906 for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map)
907 return
908
909 if ARGS.comparison == "onevsrest":
910 for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map)
911 return
912
913 for otherDataset in class_pat.keys():
914 if otherDataset != ARGS.control: createOutputMaps(i, j, core_map)
915
916 if not ERRORS: return
917 utils.logWarning(
918 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
919 ARGS.out_log)
920
921 print('Execution succeded')
922
923 ###############################################################################
924 if __name__ == "__main__":
925 main()