Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 290:71ec0b6ddde9 draft
Uploaded
author | francesco_lapi |
---|---|
date | Wed, 14 May 2025 10:43:04 +0000 |
parents | 3f5cfcf4d9eb |
children | 7f3e66dd46fa |
comparison
equal
deleted
inserted
replaced
289:3f5cfcf4d9eb | 290:71ec0b6ddde9 |
---|---|
46 | 46 |
47 #Computation details: | 47 #Computation details: |
48 parser.add_argument( | 48 parser.add_argument( |
49 '-co', '--comparison', | 49 '-co', '--comparison', |
50 type = str, | 50 type = str, |
51 default = 'manyvsmany', | 51 default = '1vs1', |
52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) | 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) |
53 | 53 |
54 parser.add_argument( | 54 parser.add_argument( |
55 '-pv' ,'--pValue', | 55 '-pv' ,'--pValue', |
56 type = float, | 56 type = float, |
183 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
184 if len(dataset.columns) < 2: | 184 if len(dataset.columns) < 2: |
185 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
186 return dataset | 186 return dataset |
187 | 187 |
188 ############################ dataset name ##################################### | |
189 def name_dataset(name_data :str, count :int) -> str: | |
190 """ | |
191 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
192 | |
193 Args: | |
194 name_data : name associated with the dataset (from frontend input params) | |
195 count : counter from 1 to make these names unique (external) | |
196 | |
197 Returns: | |
198 str : the name made unique | |
199 """ | |
200 if str(name_data) == 'Dataset': | |
201 return str(name_data) + '_' + str(count) | |
202 else: | |
203 return str(name_data) | |
204 | |
188 ############################ map_methods ###################################### | 205 ############################ map_methods ###################################### |
189 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] | 206 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] |
190 def fold_change(avg1 :float, avg2 :float) -> FoldChange: | 207 def fold_change(avg1 :float, avg2 :float) -> FoldChange: |
191 """ | 208 """ |
192 Calculates the fold change between two gene expression values. | 209 Calculates the fold change between two gene expression values. |
201 "INF" : when avg2 is 0 | 218 "INF" : when avg2 is 0 |
202 float : for any other combination of values | 219 float : for any other combination of values |
203 """ | 220 """ |
204 if avg1 == 0 and avg2 == 0: | 221 if avg1 == 0 and avg2 == 0: |
205 return 0 | 222 return 0 |
206 | 223 elif avg1 == 0: |
207 if avg1 == 0: | 224 return '-INF' |
208 return '-INF' # TODO: maybe fix | 225 elif avg2 == 0: |
209 | |
210 if avg2 == 0: | |
211 return 'INF' | 226 return 'INF' |
212 | 227 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 |
213 # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 | 228 return (avg1 - avg2) / (abs(avg1) + abs(avg2)) |
214 fc = (avg1 - avg2) / (abs(avg1) + abs(avg2)) | 229 |
215 if avg1 < 0 and avg2 < 0: fc *= -1 | |
216 return fc | |
217 | |
218 # TODO: I would really like for this one to get the Thanos treatment | |
219 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: | 230 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: |
220 """ | 231 """ |
221 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. | 232 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. |
222 | 233 |
223 Args: | 234 Args: |
249 tmp.append('stroke-width:' + width) | 260 tmp.append('stroke-width:' + width) |
250 if not flag_dash: | 261 if not flag_dash: |
251 tmp.append('stroke-dasharray:' + dash) | 262 tmp.append('stroke-dasharray:' + dash) |
252 return ';'.join(tmp) | 263 return ';'.join(tmp) |
253 | 264 |
254 # TODO: remove, there's applyRPS whatever | |
255 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! | 265 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! |
256 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: | 266 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: |
257 """ | 267 """ |
258 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. | 268 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. |
259 | 269 |
278 for el in core_map.iter(): | 288 for el in core_map.iter(): |
279 el_id = str(el.get('id')) | 289 el_id = str(el.get('id')) |
280 if el_id.startswith('R_'): | 290 if el_id.startswith('R_'): |
281 tmp = d.get(el_id[2:]) | 291 tmp = d.get(el_id[2:]) |
282 if tmp != None: | 292 if tmp != None: |
283 p_val, f_c, z_score, avg1, avg2 = tmp | 293 p_val :float = tmp[0] |
294 f_c = tmp[1] | |
295 z_score = tmp[2] | |
284 | 296 |
285 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue | 297 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue |
286 | 298 |
287 if p_val <= threshold_P_V: # p-value is OK | 299 if p_val <= threshold_P_V: |
288 if not isinstance(f_c, str): # FC is finite | 300 if not isinstance(f_c, str): |
289 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK | 301 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # |
290 col = grey | 302 col = grey |
291 width = str(minT) | 303 width = str(minT) |
292 else: # FC is OK | 304 else: |
293 if f_c < 0: | 305 if f_c < 0: |
294 col = blue | 306 col = blue |
295 elif f_c > 0: | 307 elif f_c > 0: |
296 col = red | 308 col = red |
297 width = str( | 309 width = str(max((abs(z_score) * maxT) / max_z_score, minT)) |
298 min( | 310 else: |
299 max(abs(z_score * maxT) / max_z_score, minT), | |
300 maxT)) | |
301 | |
302 else: # FC is infinite | |
303 if f_c == '-INF': | 311 if f_c == '-INF': |
304 col = blue | 312 col = blue |
305 elif f_c == 'INF': | 313 elif f_c == 'INF': |
306 col = red | 314 col = red |
307 width = str(maxT) | 315 width = str(maxT) |
308 dash = 'none' | 316 dash = 'none' |
309 else: # p-value is not OK | 317 else: |
310 dash = '5,5' | 318 dash = '5,5' |
311 col = grey | 319 col = grey |
312 width = str(minT) | 320 width = str(minT) |
313 el.set('style', fix_style(el.get('style', ""), col, width, dash)) | 321 el.set('style', fix_style(el.get('style', ""), col, width, dash)) |
314 return core_map | 322 return core_map |
324 | 332 |
325 Returns: | 333 Returns: |
326 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. | 334 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. |
327 """ | 335 """ |
328 return utils.Result.Ok( | 336 return utils.Result.Ok( |
329 lambda: metabMap.xpath(f"//*[@id=\"{reactionId}\"]")[0]).mapErr( | 337 f"//*[@id=\"{reactionId}\"]").map( |
338 lambda xPath : metabMap.xpath(xPath)[0]).mapErr( | |
330 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) | 339 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) |
331 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. | 340 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. |
332 | 341 |
333 def styleMapElement(element :ET.Element, styleStr :str) -> None: | 342 def styleMapElement(element :ET.Element, styleStr :str) -> None: |
334 currentStyles :str = element.get("style", "") | 343 currentStyles :str = element.get("style", "") |
335 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): | 344 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): |
336 currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? | 345 currentStyles = ';'.join(currentStyles.split(';')[:-3]) |
337 | |
338 #TODO: this is attempting to solve the styling override problem, not sure it does tho | |
339 | 346 |
340 element.set("style", currentStyles + styleStr) | 347 element.set("style", currentStyles + styleStr) |
341 | 348 |
342 # TODO: maybe remove vvv | |
343 class ReactionDirection(Enum): | 349 class ReactionDirection(Enum): |
344 Unknown = "" | 350 Unknown = "" |
345 Direct = "_F" | 351 Direct = "_F" |
346 Inverse = "_B" | 352 Inverse = "_B" |
347 | 353 |
370 | 376 |
371 Returns: | 377 Returns: |
372 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. | 378 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. |
373 """ | 379 """ |
374 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV | 380 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV |
375 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: | 381 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" |
376 return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ | |
377 | |
378 return f"F_{reactionId}", f"B_{reactionId}" | 382 return f"F_{reactionId}", f"B_{reactionId}" |
379 | 383 |
380 class ArrowColor(Enum): | 384 class ArrowColor(Enum): |
381 """ | 385 """ |
382 Encodes possible arrow colors based on their meaning in the enrichment process. | 386 Encodes possible arrow colors based on their meaning in the enrichment process. |
383 """ | 387 """ |
384 Invalid = "#BEBEBE" # gray, fold-change under treshold | 388 Invalid = "#BEBEBE" # gray, fold-change under treshold |
385 UpRegulated = "#ecac68" # orange, up-regulated reaction | 389 UpRegulated = "#ecac68" # red, up-regulated reaction |
386 DownRegulated = "#6495ed" # lightblue, down-regulated reaction | 390 DownRegulated = "#6495ed" # blue, down-regulated reaction |
387 | 391 |
388 UpRegulatedInv = "#FF0000" | 392 UpRegulatedInv = "#FF0000" |
389 # ^^^ bright red, up-regulated net value for a reversible reaction with | 393 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with |
390 # conflicting enrichment in the two directions. | 394 # conflicting enrichment in the two directions. |
391 | 395 |
392 DownRegulatedInv = "#0000FF" | 396 DownRegulatedInv = "#0000FF" |
393 # ^^^ bright blue, down-regulated net value for a reversible reaction with | 397 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with |
394 # conflicting enrichment in the two directions. | 398 # conflicting enrichment in the two directions. |
395 | 399 |
396 @classmethod | 400 @classmethod |
397 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": | 401 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": |
398 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) | 402 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) |
435 # Now we style the arrow head(s): | 439 # Now we style the arrow head(s): |
436 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) | 440 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) |
437 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) | 441 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) |
438 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) | 442 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) |
439 | 443 |
440 # TODO: this seems to be unused, remove | |
441 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: | 444 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: |
442 """ | 445 """ |
443 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. | 446 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. |
444 | 447 |
445 Args: | 448 Args: |
469 # vvv These constants could be inside the class itself a static properties, but python | 472 # vvv These constants could be inside the class itself a static properties, but python |
470 # was built by brainless organisms so here we are! | 473 # was built by brainless organisms so here we are! |
471 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) | 474 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) |
472 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) | 475 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) |
473 | 476 |
474 # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever | |
475 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: | 477 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: |
476 """ | 478 """ |
477 Applies RPS enrichment results to the provided metabolic map. | 479 Applies RPS enrichment results to the provided metabolic map. |
478 | 480 |
479 Args: | 481 Args: |
497 if isinstance(foldChange, str): foldChange = float(foldChange) | 499 if isinstance(foldChange, str): foldChange = float(foldChange) |
498 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow | 500 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow |
499 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) | 501 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) |
500 continue | 502 continue |
501 | 503 |
502 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): | 504 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): |
503 INVALID_ARROW.styleReactionElements(metabMap, reactionId) | 505 INVALID_ARROW.styleReactionElements(metabMap, reactionId) |
504 continue | 506 continue |
505 | 507 |
506 width = Arrow.MAX_W | 508 width = Arrow.MAX_W |
507 if not math.isinf(foldChange): | 509 if not math.isinf(foldChange): |
508 try: width = min( | 510 try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) |
509 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), | |
510 Arrow.MAX_W) | |
511 | |
512 except ZeroDivisionError: pass | 511 except ZeroDivisionError: pass |
513 | 512 |
514 if not reactionId.endswith("_RV"): # RV stands for reversible reactions | 513 if not reactionId.endswith("_RV"): # RV stands for reversible reactions |
515 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) | 514 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) |
516 continue | 515 continue |
517 | 516 |
518 reactionId = reactionId[:-3] # Remove "_RV" | 517 reactionId = reactionId[:-3] # Remove "_RV" |
519 | 518 |
520 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score | 519 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score |
520 if inversionScore == 2: foldChange *= -1 | |
521 # ^^^ Style the inverse direction with the opposite sign netValue | |
521 | 522 |
522 # If the score is 1 (opposite signs) we use alternative colors vvv | 523 # If the score is 1 (opposite signs) we use alternative colors vvv |
523 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) | 524 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) |
524 | 525 |
525 # vvv These 2 if statements can both be true and can both happen | 526 # vvv These 2 if statements can both be true and can both happen |
528 | 529 |
529 if not ARGS.using_RAS: # style arrow body | 530 if not ARGS.using_RAS: # style arrow body |
530 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) | 531 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) |
531 | 532 |
532 ############################ split class ###################################### | 533 ############################ split class ###################################### |
533 def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: | 534 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: |
534 """ | 535 """ |
535 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. | 536 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. |
536 | 537 |
537 Args: | 538 Args: |
538 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict | 539 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict |
539 dataset_values : a :dict containing :float data | 540 resolve_rules : a :dict containing :float data |
540 | 541 |
541 Returns: | 542 Returns: |
542 dict : the dict with data grouped by class | 543 dict : the dict with data grouped by class |
543 | 544 |
544 Side effects: | 545 Side effects: |
550 if pd.isnull(classe): continue | 551 if pd.isnull(classe): continue |
551 | 552 |
552 l :List[List[float]] = [] | 553 l :List[List[float]] = [] |
553 for j in range(i, len(classes)): | 554 for j in range(i, len(classes)): |
554 if classes.iloc[j, 1] == classe: | 555 if classes.iloc[j, 1] == classe: |
555 pat_id :str = classes.iloc[j, 0] # sample name | 556 pat_id :str = classes.iloc[j, 0] |
556 values = dataset_values.get(pat_id, None) # the column of values for that sample | 557 tmp = resolve_rules.get(pat_id, None) |
557 if values != None: | 558 if tmp != None: |
558 l.append(values) | 559 l.append(tmp) |
559 classes.iloc[j, 1] = None # TODO: problems? | 560 classes.iloc[j, 1] = None |
560 | 561 |
561 if l: | 562 if l: |
562 class_pat[classe] = list(map(list, zip(*l))) | 563 class_pat[classe] = list(map(list, zip(*l))) |
563 continue | 564 continue |
564 | 565 |
597 white_background = white_background.extract_band(0) | 598 white_background = white_background.extract_band(0) |
598 | 599 |
599 composite_image = white_background.composite2(image, 'over') | 600 composite_image = white_background.composite2(image, 'over') |
600 composite_image.write_to_file(png_path.show()) | 601 composite_image.write_to_file(png_path.show()) |
601 | 602 |
603 #funzione unica, lascio fuori i file e li passo in input | |
604 #conversion from png to pdf | |
605 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: | |
606 """ | |
607 Internal utility to convert a PNG to PDF to aid from SVG conversion. | |
608 | |
609 Args: | |
610 png_file : path to PNG file | |
611 pdf_file : path to new PDF file | |
612 | |
613 Returns: | |
614 None | |
615 """ | |
616 image = Image.open(png_file.show()) | |
617 image = image.convert("RGB") | |
618 image.save(pdf_file.show(), "PDF", resolution=100.0) | |
619 | |
620 #function called to reduce redundancy in the code | |
602 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: | 621 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: |
603 """ | 622 """ |
604 Converts the SVG map at the provided path to PDF. | 623 Converts the SVG map at the provided path to PDF. |
605 | 624 |
606 Args: | 625 Args: |
611 Returns: | 630 Returns: |
612 None | 631 None |
613 """ | 632 """ |
614 svg_to_png_with_background(file_svg, file_png) | 633 svg_to_png_with_background(file_svg, file_png) |
615 try: | 634 try: |
616 image = Image.open(file_png.show()) | 635 convert_png_to_pdf(file_png, file_pdf) |
617 image = image.convert("RGB") | |
618 image.save(file_pdf.show(), "PDF", resolution=100.0) | |
619 print(f'PDF file {file_pdf.filePath} successfully generated.') | 636 print(f'PDF file {file_pdf.filePath} successfully generated.') |
620 | 637 |
621 except Exception as e: | 638 except Exception as e: |
622 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') | 639 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') |
623 | 640 |
658 sizeMismatch = fieldsAmt - len(row) | 675 sizeMismatch = fieldsAmt - len(row) |
659 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) | 676 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) |
660 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) | 677 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) |
661 | 678 |
662 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible | 679 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible |
663 def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | 680 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: |
681 fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] | |
682 | |
683 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) | |
684 | |
685 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | |
664 # this function compiles the things always in common between comparison modes after enrichment. | 686 # this function compiles the things always in common between comparison modes after enrichment. |
665 # TODO: organize, name better. | 687 # TODO: organize, name better. |
666 suffix = "RAS" if ras_enrichment else "RPS" | 688 suffix = "RAS" if ras_enrichment else "RPS" |
667 writeToCsv( | 689 details = f"Tabular Result ({suffix})" |
668 [ [reactId] + values for reactId, values in tmp.items() ], | 690 |
669 ["ids", "P_Value", "fold change", "average_1", "average_2"], | 691 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) |
670 buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) | |
671 | 692 |
672 if ras_enrichment: | 693 if ras_enrichment: |
673 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) | 694 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) |
674 return | 695 return |
675 | 696 |
710 | 731 |
711 return p_value, z_score | 732 return p_value, z_score |
712 | 733 |
713 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: | 734 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: |
714 #TODO: the following code still suffers from "dumbvarnames-osis" | 735 #TODO: the following code still suffers from "dumbvarnames-osis" |
715 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} | 736 tmp :Dict[str, List[Union[float, FoldChange]]] = {} |
716 count = 0 | 737 count = 0 |
717 max_z_score = 0 | 738 max_z_score = 0 |
718 | 739 |
719 for l1, l2 in zip(dataset1Data, dataset2Data): | 740 for l1, l2 in zip(dataset1Data, dataset2Data): |
720 reactId = ids[count] | 741 reactId = ids[count] |
735 avg1 = sum(nets1) / len(nets1) | 756 avg1 = sum(nets1) / len(nets1) |
736 avg2 = sum(nets2) / len(nets2) | 757 avg2 = sum(nets2) / len(nets2) |
737 net = fold_change(avg1, avg2) | 758 net = fold_change(avg1, avg2) |
738 | 759 |
739 if math.isnan(net): continue | 760 if math.isnan(net): continue |
740 comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] | 761 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] |
741 | 762 |
742 # vvv complementary directional ids are set to None once processed if net is to be applied to tips | 763 # vvv complementary directional ids are set to None once processed if net is to be applied to tips |
743 if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows | 764 if ARGS.net: |
744 ids[position] = None | 765 ids[position] = None |
745 continue | 766 continue |
746 | 767 |
747 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used | 768 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used |
748 p_value, z_score = computePValue(l1, l2) | 769 p_value, z_score = computePValue(l1, l2) |
749 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) | 770 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) |
750 # vvv TODO: Check numpy version compatibility | 771 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) |
751 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) | 772 tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] |
752 comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | |
753 | 773 |
754 except (TypeError, ZeroDivisionError): continue | 774 except (TypeError, ZeroDivisionError): continue |
755 | 775 |
756 return comparisonResult, max_z_score | 776 return tmp, max_z_score |
757 | 777 |
758 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: | 778 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
759 """ | 779 """ |
760 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 780 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
761 provided metabolic map. | 781 provided metabolic map. |
803 utils.writeSvg(svgFilePath, core_map) | 823 utils.writeSvg(svgFilePath, core_map) |
804 | 824 |
805 if ARGS.generate_pdf: | 825 if ARGS.generate_pdf: |
806 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) | 826 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
807 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) | 827 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
808 svg_to_png_with_background(svgFilePath, pngPath) | 828 convert_to_pdf(svgFilePath, pngPath, pdfPath) |
809 try: | 829 |
810 image = Image.open(pngPath.show()) | 830 if not ARGS.generate_svg: |
811 image = image.convert("RGB") | |
812 image.save(pdfPath.show(), "PDF", resolution=100.0) | |
813 print(f'PDF file {pdfPath.filePath} successfully generated.') | |
814 | |
815 except Exception as e: | |
816 raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') | |
817 | |
818 if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not | |
819 os.remove(svgFilePath.show()) | 831 os.remove(svgFilePath.show()) |
820 | 832 |
821 ClassPat = Dict[str, List[List[float]]] | 833 ClassPat = Dict[str, List[List[float]]] |
822 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 834 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
823 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 835 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
824 # for the sake of everyone's sanity. | 836 # for the sake of everyone's sanity. |
825 class_pat :ClassPat = {} | 837 class_pat :ClassPat = {} |
826 if ARGS.option == 'datasets': | 838 if ARGS.option == 'datasets': |
827 num = 1 | 839 num = 1 #TODO: the dataset naming function could be a generator |
828 for path, name in zip(datasetsPaths, names): | 840 for path, name in zip(datasetsPaths, names): |
829 name = str(name) | 841 name = name_dataset(name, num) |
830 if name == 'Dataset': | 842 resolve_rules_float, ids = getDatasetValues(path, name) |
831 name += '_' + str(num) | 843 if resolve_rules_float != None: |
832 | 844 class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) |
833 values, ids = getDatasetValues(path, name) | 845 |
834 if values != None: | |
835 class_pat[name] = list(map(list, zip(*values.values()))) | |
836 # TODO: ??? | |
837 | |
838 num += 1 | 846 num += 1 |
839 | 847 |
840 elif ARGS.option == "dataset_class": | 848 elif ARGS.option == "dataset_class": |
841 classes = read_dataset(classPath, "class") | 849 classes = read_dataset(classPath, "class") |
842 classes = classes.astype(str) | 850 classes = classes.astype(str) |
843 | 851 |
844 values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") | 852 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") |
845 if values != None: class_pat = split_class(classes, values) | 853 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) |
846 | 854 |
847 return ids, class_pat | 855 return ids, class_pat |
848 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) | 856 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) |
849 | 857 |
850 #TODO: create these damn args as FilePath objects | 858 #TODO: create these damn args as FilePath objects |
877 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 885 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
878 """ | 886 """ |
879 global ARGS | 887 global ARGS |
880 ARGS = process_args(args) | 888 ARGS = process_args(args) |
881 | 889 |
882 # Create output folder | |
883 if not os.path.isdir(ARGS.output_path): | 890 if not os.path.isdir(ARGS.output_path): |
884 os.makedirs(ARGS.output_path, exist_ok=True) | 891 os.makedirs(ARGS.output_path, exist_ok=True) |
885 | 892 |
886 core_map: ET.ElementTree = ARGS.choice_map.getMap( | 893 core_map: ET.ElementTree = ARGS.choice_map.getMap( |
887 ARGS.tool_dir, | 894 ARGS.tool_dir, |
888 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 895 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
889 | 896 |
890 # TODO: in the future keep the indices WITH the data and fix the code below. | |
891 | |
892 # Prepare enrichment results containers | 897 # Prepare enrichment results containers |
893 ras_results = [] | 898 ras_results = [] |
894 rps_results = [] | 899 rps_results = [] |
895 | 900 |
896 # Compute RAS enrichment if requested | 901 # Compute RAS enrichment if requested |
905 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 910 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
906 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) | 911 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) |
907 | 912 |
908 # Organize by comparison pairs | 913 # Organize by comparison pairs |
909 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} | 914 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} |
910 for i, j, comparison_data, max_z_score in ras_results: | 915 for i, j, d, mz in ras_results: |
911 comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} | 916 comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} |
912 for i, j, comparison_data, max_z_score in rps_results: | 917 for i, j, d, mz in rps_results: |
913 comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) | 918 comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) |
914 | 919 |
915 # For each comparison, create a styled map with RAS bodies and RPS heads | 920 # For each comparison, create a styled map with RAS bodies and RPS heads |
916 for (i, j), res in comparisons.items(): | 921 for (i, j), res in comparisons.items(): |
917 map_copy = copy.deepcopy(core_map) | 922 map_copy = copy.deepcopy(core_map) |
918 | 923 |
922 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) | 927 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) |
923 | 928 |
924 # Apply RPS styling to arrow heads | 929 # Apply RPS styling to arrow heads |
925 if res.get('rps'): | 930 if res.get('rps'): |
926 tmp_rps, max_z_rps = res['rps'] | 931 tmp_rps, max_z_rps = res['rps'] |
927 # applyRpsEnrichmentToMap styles only heads unless only RPS are used | 932 # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed |
928 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) | 933 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) |
929 | 934 |
930 # Output both SVG and PDF/PNG as configured | 935 # Output both SVG and PDF/PNG as configured |
931 createOutputMaps(i, j, map_copy) | 936 createOutputMaps(i, j, map_copy) |
932 print('Execution succeeded') | 937 print('Execution succeeded') |