| 
4
 | 
     1 from __future__ import division
 | 
| 
 | 
     2 import csv
 | 
| 
 | 
     3 from enum import Enum
 | 
| 
 | 
     4 import re
 | 
| 
 | 
     5 import sys
 | 
| 
 | 
     6 import numpy as np
 | 
| 
 | 
     7 import pandas as pd
 | 
| 
 | 
     8 import itertools as it
 | 
| 
 | 
     9 import scipy.stats as st
 | 
| 
 | 
    10 import lxml.etree as ET
 | 
| 
 | 
    11 import math
 | 
| 
 | 
    12 import utils.general_utils as utils
 | 
| 
 | 
    13 from PIL import Image
 | 
| 
 | 
    14 import os
 | 
| 
 | 
    15 import argparse
 | 
| 
 | 
    16 import pyvips
 | 
| 
 | 
    17 from typing import Tuple, Union, Optional, List, Dict
 | 
| 
143
 | 
    18 import copy
 | 
| 
4
 | 
    19 
 | 
| 
 | 
    20 ERRORS = []
 | 
| 
 | 
    21 ########################## argparse ##########################################
 | 
| 
 | 
    22 ARGS :argparse.Namespace
 | 
| 
147
 | 
    23 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
4
 | 
    24     """
 | 
| 
 | 
    25     Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
 | 
| 
 | 
    26 
 | 
| 
 | 
    27     Args:
 | 
| 
 | 
    28         args : Always obtained (in file) from sys.argv
 | 
| 
 | 
    29 
 | 
| 
 | 
    30     Returns:
 | 
| 
 | 
    31         Namespace : An object containing the parsed arguments
 | 
| 
 | 
    32     """
 | 
| 
 | 
    33     parser = argparse.ArgumentParser(
 | 
| 
 | 
    34         usage = "%(prog)s [options]",
 | 
| 
 | 
    35         description = "process some value's genes to create a comparison's map.")
 | 
| 
 | 
    36     
 | 
| 
 | 
    37     #General:
 | 
| 
 | 
    38     parser.add_argument(
 | 
| 
 | 
    39         '-td', '--tool_dir',
 | 
| 
 | 
    40         type = str,
 | 
| 
 | 
    41         required = True,
 | 
| 
 | 
    42         help = 'your tool directory')
 | 
| 
 | 
    43     
 | 
| 
 | 
    44     parser.add_argument('-on', '--control', type = str)
 | 
| 
 | 
    45     parser.add_argument('-ol', '--out_log', help = "Output log")
 | 
| 
 | 
    46 
 | 
| 
 | 
    47     #Computation details:
 | 
| 
 | 
    48     parser.add_argument(
 | 
| 
 | 
    49         '-co', '--comparison',
 | 
| 
 | 
    50         type = str, 
 | 
| 
 | 
    51         default = '1vs1',
 | 
| 
 | 
    52         choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
 | 
| 
 | 
    53     
 | 
| 
 | 
    54     parser.add_argument(
 | 
| 
 | 
    55         '-pv' ,'--pValue',
 | 
| 
 | 
    56         type = float, 
 | 
| 
 | 
    57         default = 0.1, 
 | 
| 
 | 
    58         help = 'P-Value threshold (default: %(default)s)')
 | 
| 
 | 
    59     
 | 
| 
 | 
    60     parser.add_argument(
 | 
| 
 | 
    61         '-fc', '--fChange',
 | 
| 
 | 
    62         type = float, 
 | 
| 
 | 
    63         default = 1.5, 
 | 
| 
 | 
    64         help = 'Fold-Change threshold (default: %(default)s)')
 | 
| 
 | 
    65     
 | 
| 
 | 
    66     parser.add_argument(
 | 
| 
 | 
    67         "-ne", "--net",
 | 
| 
 | 
    68         type = utils.Bool("net"), default = False,
 | 
| 
 | 
    69         help = "choose if you want net enrichment for RPS")
 | 
| 
 | 
    70 
 | 
| 
 | 
    71     parser.add_argument(
 | 
| 
 | 
    72         '-op', '--option',
 | 
| 
 | 
    73         type = str, 
 | 
| 
 | 
    74         choices = ['datasets', 'dataset_class'],
 | 
| 
 | 
    75         help='dataset or dataset and class')
 | 
| 
 | 
    76     
 | 
| 
 | 
    77     #RAS:
 | 
| 
 | 
    78     parser.add_argument(
 | 
| 
 | 
    79         "-ra", "--using_RAS",
 | 
| 
 | 
    80         type = utils.Bool("using_RAS"), default = True,
 | 
| 
 | 
    81         help = "choose whether to use RAS datasets.")
 | 
| 
 | 
    82 
 | 
| 
 | 
    83     parser.add_argument(
 | 
| 
 | 
    84         '-id', '--input_data',
 | 
| 
 | 
    85         type = str,
 | 
| 
 | 
    86         help = 'input dataset')
 | 
| 
 | 
    87     
 | 
| 
 | 
    88     parser.add_argument(
 | 
| 
 | 
    89         '-ic', '--input_class',
 | 
| 
 | 
    90         type = str, 
 | 
| 
 | 
    91         help = 'sample group specification')
 | 
| 
 | 
    92     
 | 
| 
 | 
    93     parser.add_argument(
 | 
| 
 | 
    94         '-ids', '--input_datas',
 | 
| 
 | 
    95         type = str,
 | 
| 
 | 
    96         nargs = '+', 
 | 
| 
 | 
    97         help = 'input datasets')
 | 
| 
 | 
    98     
 | 
| 
 | 
    99     parser.add_argument(
 | 
| 
 | 
   100         '-na', '--names',
 | 
| 
 | 
   101         type = str,
 | 
| 
 | 
   102         nargs = '+', 
 | 
| 
 | 
   103         help = 'input names')
 | 
| 
 | 
   104     
 | 
| 
 | 
   105     #RPS:
 | 
| 
 | 
   106     parser.add_argument(
 | 
| 
 | 
   107         "-rp", "--using_RPS",
 | 
| 
 | 
   108         type = utils.Bool("using_RPS"), default = False,
 | 
| 
 | 
   109         help = "choose whether to use RPS datasets.")
 | 
| 
 | 
   110     
 | 
| 
 | 
   111     parser.add_argument(
 | 
| 
 | 
   112         '-idr', '--input_data_rps',
 | 
| 
 | 
   113         type = str,
 | 
| 
 | 
   114         help = 'input dataset rps')
 | 
| 
 | 
   115     
 | 
| 
 | 
   116     parser.add_argument(
 | 
| 
 | 
   117         '-icr', '--input_class_rps', 
 | 
| 
 | 
   118         type = str,
 | 
| 
 | 
   119         help = 'sample group specification rps')
 | 
| 
 | 
   120     
 | 
| 
 | 
   121     parser.add_argument(
 | 
| 
 | 
   122         '-idsr', '--input_datas_rps', 
 | 
| 
 | 
   123         type = str,
 | 
| 
 | 
   124         nargs = '+', 
 | 
| 
 | 
   125         help = 'input datasets rps')
 | 
| 
 | 
   126     
 | 
| 
 | 
   127     parser.add_argument(
 | 
| 
 | 
   128         '-nar', '--names_rps', 
 | 
| 
 | 
   129         type = str,
 | 
| 
 | 
   130         nargs = '+', 
 | 
| 
 | 
   131         help = 'input names rps')
 | 
| 
 | 
   132     
 | 
| 
 | 
   133     #Output:
 | 
| 
 | 
   134     parser.add_argument(
 | 
| 
 | 
   135         "-gs", "--generate_svg",
 | 
| 
 | 
   136         type = utils.Bool("generate_svg"), default = True,
 | 
| 
 | 
   137         help = "choose whether to use RAS datasets.")
 | 
| 
 | 
   138     
 | 
| 
 | 
   139     parser.add_argument(
 | 
| 
 | 
   140         "-gp", "--generate_pdf",
 | 
| 
 | 
   141         type = utils.Bool("generate_pdf"), default = True,
 | 
| 
 | 
   142         help = "choose whether to use RAS datasets.")
 | 
| 
 | 
   143     
 | 
| 
 | 
   144     parser.add_argument(
 | 
| 
 | 
   145         '-cm', '--custom_map',
 | 
| 
 | 
   146         type = str,
 | 
| 
 | 
   147         help='custom map to use')
 | 
| 
 | 
   148     
 | 
| 
 | 
   149     parser.add_argument(
 | 
| 
146
 | 
   150         '-idop', '--output_path', 
 | 
| 
 | 
   151         type = str,
 | 
| 
 | 
   152         default='result',
 | 
| 
 | 
   153         help = 'output path for maps')
 | 
| 
 | 
   154     
 | 
| 
 | 
   155     parser.add_argument(
 | 
| 
4
 | 
   156         '-mc',  '--choice_map',
 | 
| 
 | 
   157         type = utils.Model, default = utils.Model.HMRcore,
 | 
| 
 | 
   158         choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
 | 
| 
 | 
   159 
 | 
| 
146
 | 
   160     args :argparse.Namespace = parser.parse_args(args)
 | 
| 
4
 | 
   161     if args.using_RAS and not args.using_RPS: args.net = False
 | 
| 
 | 
   162 
 | 
| 
 | 
   163     return args
 | 
| 
 | 
   164           
 | 
| 
 | 
   165 ############################ dataset input ####################################
 | 
| 
 | 
   166 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
   167     """
 | 
| 
 | 
   168     Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
 | 
| 
 | 
   169 
 | 
| 
 | 
   170     Args:
 | 
| 
 | 
   171         data : filepath of a dataset (from frontend input params or literals upon calling)
 | 
| 
 | 
   172         name : name associated with the dataset (from frontend input params or literals upon calling)
 | 
| 
 | 
   173 
 | 
| 
 | 
   174     Returns:
 | 
| 
 | 
   175         pd.DataFrame : dataset in a runtime operable shape
 | 
| 
 | 
   176     
 | 
| 
 | 
   177     Raises:
 | 
| 
 | 
   178         sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
 | 
| 
 | 
   179     """
 | 
| 
 | 
   180     try:
 | 
| 
 | 
   181         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
 | 
| 
 | 
   182     except pd.errors.EmptyDataError:
 | 
| 
 | 
   183         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
   184     if len(dataset.columns) < 2:
 | 
| 
 | 
   185         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
   186     return dataset
 | 
| 
 | 
   187 
 | 
| 
 | 
   188 ############################ dataset name #####################################
 | 
| 
 | 
   189 def name_dataset(name_data :str, count :int) -> str:
 | 
| 
 | 
   190     """
 | 
| 
 | 
   191     Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
 | 
| 
 | 
   192 
 | 
| 
 | 
   193     Args:
 | 
| 
 | 
   194         name_data : name associated with the dataset (from frontend input params)
 | 
| 
 | 
   195         count : counter from 1 to make these names unique (external)
 | 
| 
 | 
   196 
 | 
| 
 | 
   197     Returns:
 | 
| 
 | 
   198         str : the name made unique
 | 
| 
 | 
   199     """
 | 
| 
 | 
   200     if str(name_data) == 'Dataset':
 | 
| 
 | 
   201         return str(name_data) + '_' + str(count)
 | 
| 
 | 
   202     else:
 | 
| 
 | 
   203         return str(name_data)
 | 
| 
 | 
   204 
 | 
| 
 | 
   205 ############################ map_methods ######################################
 | 
| 
 | 
   206 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
 | 
| 
 | 
   207 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
 | 
| 
 | 
   208     """
 | 
| 
 | 
   209     Calculates the fold change between two gene expression values.
 | 
| 
 | 
   210 
 | 
| 
 | 
   211     Args:
 | 
| 
 | 
   212         avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
 | 
| 
 | 
   213 
 | 
| 
 | 
   214     Returns:
 | 
| 
 | 
   215         FoldChange :
 | 
| 
 | 
   216             0 : when both input values are 0
 | 
| 
 | 
   217             "-INF" : when avg1 is 0
 | 
| 
 | 
   218             "INF" : when avg2 is 0
 | 
| 
 | 
   219             float : for any other combination of values
 | 
| 
 | 
   220     """
 | 
| 
 | 
   221     if avg1 == 0 and avg2 == 0:
 | 
| 
 | 
   222         return 0
 | 
| 
 | 
   223     elif avg1 == 0:
 | 
| 
 | 
   224         return '-INF'
 | 
| 
 | 
   225     elif avg2 == 0:
 | 
| 
 | 
   226         return 'INF'
 | 
| 
 | 
   227     else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
 | 
| 
 | 
   228         return (avg1 - avg2) / (abs(avg1) + abs(avg2))
 | 
| 
 | 
   229     
 | 
| 
 | 
   230 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
 | 
| 
 | 
   231     """
 | 
| 
 | 
   232     Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
 | 
| 
 | 
   233 
 | 
| 
 | 
   234     Args:
 | 
| 
 | 
   235         l : current style string of an SVG element
 | 
| 
 | 
   236         col : new value for the "stroke" style property
 | 
| 
 | 
   237         width : new value for the "stroke-width" style property
 | 
| 
 | 
   238         dash : new value for the "stroke-dasharray" style property
 | 
| 
 | 
   239 
 | 
| 
 | 
   240     Returns:
 | 
| 
 | 
   241         str : the fixed style string
 | 
| 
 | 
   242     """
 | 
| 
 | 
   243     tmp = l.split(';')
 | 
| 
 | 
   244     flag_col = False
 | 
| 
 | 
   245     flag_width = False
 | 
| 
 | 
   246     flag_dash = False
 | 
| 
 | 
   247     for i in range(len(tmp)):
 | 
| 
 | 
   248         if tmp[i].startswith('stroke:'):
 | 
| 
 | 
   249             tmp[i] = 'stroke:' + col
 | 
| 
 | 
   250             flag_col = True
 | 
| 
 | 
   251         if tmp[i].startswith('stroke-width:'):
 | 
| 
 | 
   252             tmp[i] = 'stroke-width:' + width
 | 
| 
 | 
   253             flag_width = True
 | 
| 
 | 
   254         if tmp[i].startswith('stroke-dasharray:'):
 | 
| 
 | 
   255             tmp[i] = 'stroke-dasharray:' + dash
 | 
| 
 | 
   256             flag_dash = True
 | 
| 
 | 
   257     if not flag_col:
 | 
| 
 | 
   258         tmp.append('stroke:' + col)
 | 
| 
 | 
   259     if not flag_width:
 | 
| 
 | 
   260         tmp.append('stroke-width:' + width)
 | 
| 
 | 
   261     if not flag_dash:
 | 
| 
 | 
   262         tmp.append('stroke-dasharray:' + dash)
 | 
| 
 | 
   263     return ';'.join(tmp)
 | 
| 
 | 
   264 
 | 
| 
 | 
   265 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
 | 
| 
 | 
   266 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
 | 
| 
 | 
   267     """
 | 
| 
 | 
   268     Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
 | 
| 
 | 
   269 
 | 
| 
 | 
   270     Args:
 | 
| 
 | 
   271         d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
 | 
| 
 | 
   272         core_map : SVG map to modify
 | 
| 
 | 
   273         threshold_P_V : threshold for a p-value to be considered significant
 | 
| 
 | 
   274         threshold_F_C : threshold for a fold change value to be considered significant
 | 
| 
 | 
   275         max_z_score : highest z-score (absolute value)
 | 
| 
 | 
   276     
 | 
| 
 | 
   277     Returns:
 | 
| 
 | 
   278         ET.ElementTree : the modified core_map
 | 
| 
 | 
   279 
 | 
| 
 | 
   280     Side effects:
 | 
| 
 | 
   281         core_map : mut
 | 
| 
 | 
   282     """
 | 
| 
 | 
   283     maxT = 12
 | 
| 
 | 
   284     minT = 2
 | 
| 
 | 
   285     grey = '#BEBEBE'
 | 
| 
 | 
   286     blue = '#6495ed'
 | 
| 
 | 
   287     red = '#ecac68'
 | 
| 
 | 
   288     for el in core_map.iter():
 | 
| 
 | 
   289         el_id = str(el.get('id'))
 | 
| 
 | 
   290         if el_id.startswith('R_'):
 | 
| 
 | 
   291             tmp = d.get(el_id[2:])
 | 
| 
 | 
   292             if tmp != None:
 | 
| 
 | 
   293                 p_val :float = tmp[0]
 | 
| 
 | 
   294                 f_c = tmp[1]
 | 
| 
 | 
   295                 z_score = tmp[2]
 | 
| 
276
 | 
   296                 
 | 
| 
 | 
   297                 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue
 | 
| 
 | 
   298 
 | 
| 
272
 | 
   299                 if p_val <= threshold_P_V:
 | 
| 
4
 | 
   300                     if not isinstance(f_c, str):
 | 
| 
 | 
   301                         if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # 
 | 
| 
 | 
   302                             col = grey
 | 
| 
 | 
   303                             width = str(minT)
 | 
| 
 | 
   304                         else:
 | 
| 
 | 
   305                             if f_c < 0:
 | 
| 
 | 
   306                                 col = blue
 | 
| 
 | 
   307                             elif f_c > 0:
 | 
| 
 | 
   308                                 col = red
 | 
| 
 | 
   309                             width = str(max((abs(z_score) * maxT) / max_z_score, minT))
 | 
| 
 | 
   310                     else:
 | 
| 
 | 
   311                         if f_c == '-INF':
 | 
| 
 | 
   312                             col = blue
 | 
| 
 | 
   313                         elif f_c == 'INF':
 | 
| 
 | 
   314                             col = red
 | 
| 
 | 
   315                         width = str(maxT)
 | 
| 
 | 
   316                     dash = 'none'
 | 
| 
 | 
   317                 else:
 | 
| 
 | 
   318                     dash = '5,5'
 | 
| 
 | 
   319                     col = grey
 | 
| 
 | 
   320                     width = str(minT)
 | 
| 
 | 
   321                 el.set('style', fix_style(el.get('style', ""), col, width, dash))
 | 
| 
 | 
   322     return core_map
 | 
| 
 | 
   323 
 | 
| 
 | 
   324 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
 | 
| 
 | 
   325     """
 | 
| 
 | 
   326     Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
 | 
| 
 | 
   327     not enforced, if more than one element with the exact ID is found only the first will be returned.
 | 
| 
 | 
   328 
 | 
| 
 | 
   329     Args:
 | 
| 
 | 
   330         reactionId (str): exact ID of the requested element.
 | 
| 
 | 
   331         metabMap (ET.ElementTree): metabolic map containing the element.
 | 
| 
 | 
   332 
 | 
| 
 | 
   333     Returns:
 | 
| 
 | 
   334         utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
 | 
| 
 | 
   335     """
 | 
| 
 | 
   336     return utils.Result.Ok(
 | 
| 
 | 
   337         f"//*[@id=\"{reactionId}\"]").map(
 | 
| 
 | 
   338         lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
 | 
| 
 | 
   339         lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
 | 
| 
 | 
   340         # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
 | 
| 
 | 
   341 
 | 
| 
 | 
   342 def styleMapElement(element :ET.Element, styleStr :str) -> None:
 | 
| 
 | 
   343     currentStyles :str = element.get("style", "")
 | 
| 
 | 
   344     if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
 | 
| 
 | 
   345         currentStyles = ';'.join(currentStyles.split(';')[:-3])
 | 
| 
 | 
   346 
 | 
| 
 | 
   347     element.set("style", currentStyles + styleStr)
 | 
| 
 | 
   348 
 | 
| 
 | 
   349 class ReactionDirection(Enum):
 | 
| 
 | 
   350     Unknown = ""
 | 
| 
 | 
   351     Direct  = "_F"
 | 
| 
 | 
   352     Inverse = "_B"
 | 
| 
 | 
   353 
 | 
| 
 | 
   354     @classmethod
 | 
| 
 | 
   355     def fromDir(cls, s :str) -> "ReactionDirection":
 | 
| 
 | 
   356         # vvv as long as there's so few variants I actually condone the if spam:
 | 
| 
 | 
   357         if s == ReactionDirection.Direct.value:  return ReactionDirection.Direct
 | 
| 
 | 
   358         if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
 | 
| 
 | 
   359         return ReactionDirection.Unknown
 | 
| 
 | 
   360 
 | 
| 
 | 
   361     @classmethod
 | 
| 
 | 
   362     def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
 | 
| 
 | 
   363         return ReactionDirection.fromDir(reactionId[-2:])
 | 
| 
 | 
   364 
 | 
| 
 | 
   365 def getArrowBodyElementId(reactionId :str) -> str:
 | 
| 
 | 
   366     if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
 | 
| 
 | 
   367     elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
 | 
| 
 | 
   368     return f"R_{reactionId}"
 | 
| 
 | 
   369 
 | 
| 
 | 
   370 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
 | 
| 
 | 
   371     """
 | 
| 
 | 
   372     We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
 | 
| 
 | 
   373 
 | 
| 
 | 
   374     Args:
 | 
| 
 | 
   375         reactionId : the provided reaction ID.
 | 
| 
 | 
   376 
 | 
| 
 | 
   377     Returns:
 | 
| 
 | 
   378         Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
 | 
| 
 | 
   379     """
 | 
| 
 | 
   380     if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
 | 
| 
 | 
   381     elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
 | 
| 
 | 
   382     return f"F_{reactionId}", f"B_{reactionId}"
 | 
| 
 | 
   383 
 | 
| 
 | 
   384 class ArrowColor(Enum):
 | 
| 
 | 
   385     """
 | 
| 
 | 
   386     Encodes possible arrow colors based on their meaning in the enrichment process.
 | 
| 
 | 
   387     """
 | 
| 
 | 
   388     Invalid       = "#BEBEBE" # gray, fold-change under treshold
 | 
| 
 | 
   389     UpRegulated   = "#ecac68" # red, up-regulated reaction
 | 
| 
 | 
   390     DownRegulated = "#6495ed" # blue, down-regulated reaction
 | 
| 
 | 
   391 
 | 
| 
 | 
   392     UpRegulatedInv = "#FF0000"
 | 
| 
 | 
   393     # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
 | 
| 
 | 
   394     # conflicting enrichment in the two directions.
 | 
| 
 | 
   395 
 | 
| 
 | 
   396     DownRegulatedInv = "#0000FF"
 | 
| 
 | 
   397     # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
 | 
| 
 | 
   398     # conflicting enrichment in the two directions.
 | 
| 
 | 
   399 
 | 
| 
 | 
   400     @classmethod
 | 
| 
 | 
   401     def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
 | 
| 
 | 
   402         colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
 | 
| 
 | 
   403         return colors[useAltColor]
 | 
| 
 | 
   404 
 | 
| 
 | 
   405     def __str__(self) -> str: return self.value
 | 
| 
 | 
   406 
 | 
| 
 | 
   407 class Arrow:
 | 
| 
 | 
   408     """
 | 
| 
 | 
   409     Models the properties of a reaction arrow that change based on enrichment.
 | 
| 
 | 
   410     """
 | 
| 
 | 
   411     MIN_W = 2
 | 
| 
 | 
   412     MAX_W = 12
 | 
| 
 | 
   413 
 | 
| 
 | 
   414     def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
 | 
| 
 | 
   415         """
 | 
| 
 | 
   416         (Private) Initializes an instance of Arrow.
 | 
| 
 | 
   417 
 | 
| 
 | 
   418         Args:
 | 
| 
 | 
   419             width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
 | 
| 
 | 
   420             col : color of the arrow.
 | 
| 
 | 
   421             isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
 | 
| 
 | 
   422         
 | 
| 
 | 
   423         Returns:
 | 
| 
 | 
   424             None : practically, a Arrow instance.
 | 
| 
 | 
   425         """
 | 
| 
 | 
   426         self.w    = width
 | 
| 
 | 
   427         self.col  = col
 | 
| 
 | 
   428         self.dash = isDashed
 | 
| 
 | 
   429     
 | 
| 
 | 
   430     def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
 | 
| 
 | 
   431         if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
 | 
| 
 | 
   432             ERRORS.append(reactionId)
 | 
| 
 | 
   433 
 | 
| 
 | 
   434     def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
 | 
| 
 | 
   435         # If We're dealing with RAS data or in general don't care about the direction of the reaction we only style the arrow body
 | 
| 
 | 
   436         if not mindReactionDir:
 | 
| 
 | 
   437             return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
 | 
| 
284
 | 
   438         
 | 
| 
4
 | 
   439         # Now we style the arrow head(s):
 | 
| 
 | 
   440         idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
 | 
| 
 | 
   441         self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
 | 
| 
 | 
   442         if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
 | 
| 
 | 
   443     
 | 
| 
 | 
   444     def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
 | 
| 
 | 
   445         """
 | 
| 
 | 
   446         Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
 | 
| 
 | 
   447 
 | 
| 
 | 
   448         Args:
 | 
| 
 | 
   449             reactionId: the reaction ID, as encoded in the dataset.
 | 
| 
 | 
   450             mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
 | 
| 
 | 
   451     
 | 
| 
 | 
   452         Returns:
 | 
| 
 | 
   453             str : the ID of an arrow's body or tips in the map.
 | 
| 
 | 
   454         """
 | 
| 
 | 
   455         # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
 | 
| 
 | 
   456         if not mindReactionDir: return "R_" + reactionId
 | 
| 
 | 
   457 
 | 
| 
 | 
   458         #TODO: this is clearly something we need to make consistent in RPS
 | 
| 
 | 
   459         return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
 | 
| 
 | 
   460 
 | 
| 
 | 
   461     def toStyleStr(self, *, downSizedForTips = False) -> str:
 | 
| 
 | 
   462         """
 | 
| 
 | 
   463         Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
 | 
| 
 | 
   464 
 | 
| 
 | 
   465         Returns:
 | 
| 
 | 
   466             str : the styles string.
 | 
| 
 | 
   467         """
 | 
| 
 | 
   468         width = self.w
 | 
| 
 | 
   469         if downSizedForTips: width *= 0.8
 | 
| 
 | 
   470         return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
 | 
| 
 | 
   471 
 | 
| 
 | 
   472 # vvv These constants could be inside the class itself a static properties, but python
 | 
| 
 | 
   473 # was built by brainless organisms so here we are!
 | 
| 
 | 
   474 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
 | 
| 
 | 
   475 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
 | 
| 
 | 
   476 
 | 
| 
 | 
   477 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
 | 
| 
 | 
   478     """
 | 
| 
 | 
   479     Applies RPS enrichment results to the provided metabolic map.
 | 
| 
 | 
   480 
 | 
| 
 | 
   481     Args:
 | 
| 
 | 
   482         rpsEnrichmentRes : RPS enrichment results.
 | 
| 
 | 
   483         metabMap : the metabolic map to edit.
 | 
| 
 | 
   484         maxNumericZScore : biggest finite z-score value found.
 | 
| 
 | 
   485     
 | 
| 
 | 
   486     Side effects:
 | 
| 
 | 
   487         metabMap : mut
 | 
| 
 | 
   488     
 | 
| 
 | 
   489     Returns:
 | 
| 
 | 
   490         None
 | 
| 
 | 
   491     """
 | 
| 
 | 
   492     for reactionId, values in rpsEnrichmentRes.items():
 | 
| 
 | 
   493         pValue = values[0]
 | 
| 
 | 
   494         foldChange = values[1]
 | 
| 
 | 
   495         z_score = values[2]
 | 
| 
 | 
   496 
 | 
| 
276
 | 
   497         if math.isnan(pValue) or (isinstance(foldChange, float) and math.isnan(foldChange)): continue
 | 
| 
 | 
   498 
 | 
| 
4
 | 
   499         if isinstance(foldChange, str): foldChange = float(foldChange)
 | 
| 
 | 
   500         if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
 | 
| 
 | 
   501             INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
 | 
| 
 | 
   502             continue
 | 
| 
 | 
   503 
 | 
| 
 | 
   504         if abs(foldChange) <  (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
 | 
| 
 | 
   505             INVALID_ARROW.styleReactionElements(metabMap, reactionId)
 | 
| 
 | 
   506             continue
 | 
| 
 | 
   507         
 | 
| 
 | 
   508         width = Arrow.MAX_W
 | 
| 
 | 
   509         if not math.isinf(foldChange):
 | 
| 
 | 
   510             try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
 | 
| 
 | 
   511             except ZeroDivisionError: pass
 | 
| 
 | 
   512         
 | 
| 
 | 
   513         if not reactionId.endswith("_RV"): # RV stands for reversible reactions
 | 
| 
 | 
   514             Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
 | 
| 
 | 
   515             continue
 | 
| 
 | 
   516         
 | 
| 
 | 
   517         reactionId = reactionId[:-3] # Remove "_RV"
 | 
| 
 | 
   518         
 | 
| 
 | 
   519         inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
 | 
| 
 | 
   520         if inversionScore == 2: foldChange *= -1
 | 
| 
 | 
   521         # ^^^ Style the inverse direction with the opposite sign netValue
 | 
| 
 | 
   522         
 | 
| 
 | 
   523         # If the score is 1 (opposite signs) we use alternative colors vvv
 | 
| 
 | 
   524         arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
 | 
| 
 | 
   525         
 | 
| 
 | 
   526         # vvv These 2 if statements can both be true and can both happen
 | 
| 
 | 
   527         if ARGS.net: # style arrow head(s):
 | 
| 
 | 
   528             arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
 | 
| 
 | 
   529         
 | 
| 
 | 
   530         if not ARGS.using_RAS: # style arrow body
 | 
| 
 | 
   531             arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
 | 
| 
 | 
   532 
 | 
| 
 | 
   533 ############################ split class ######################################
 | 
| 
 | 
   534 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
 | 
| 
 | 
   535     """
 | 
| 
 | 
   536     Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
 | 
| 
 | 
   537 
 | 
| 
 | 
   538     Args:
 | 
| 
 | 
   539         classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
 | 
| 
 | 
   540         resolve_rules : a :dict containing :float data
 | 
| 
 | 
   541 
 | 
| 
 | 
   542     Returns:
 | 
| 
 | 
   543         dict : the dict with data grouped by class
 | 
| 
 | 
   544 
 | 
| 
 | 
   545     Side effects:
 | 
| 
 | 
   546         classes : mut
 | 
| 
 | 
   547     """
 | 
| 
 | 
   548     class_pat :Dict[str, List[List[float]]] = {}
 | 
| 
 | 
   549     for i in range(len(classes)):
 | 
| 
 | 
   550         classe :str = classes.iloc[i, 1]
 | 
| 
 | 
   551         if pd.isnull(classe): continue
 | 
| 
 | 
   552 
 | 
| 
 | 
   553         l :List[List[float]] = []
 | 
| 
 | 
   554         for j in range(i, len(classes)):
 | 
| 
 | 
   555             if classes.iloc[j, 1] == classe:
 | 
| 
 | 
   556                 pat_id :str = classes.iloc[j, 0]
 | 
| 
 | 
   557                 tmp = resolve_rules.get(pat_id, None)
 | 
| 
 | 
   558                 if tmp != None:
 | 
| 
 | 
   559                     l.append(tmp)
 | 
| 
 | 
   560                 classes.iloc[j, 1] = None
 | 
| 
 | 
   561         
 | 
| 
 | 
   562         if l:
 | 
| 
 | 
   563             class_pat[classe] = list(map(list, zip(*l)))
 | 
| 
 | 
   564             continue
 | 
| 
 | 
   565         
 | 
| 
 | 
   566         utils.logWarning(
 | 
| 
 | 
   567             f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
 | 
| 
 | 
   568     
 | 
| 
 | 
   569     return class_pat
 | 
| 
 | 
   570 
 | 
| 
 | 
   571 ############################ conversion ##############################################
 | 
| 
 | 
   572 #conversion from svg to png 
 | 
| 
 | 
   573 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
 | 
| 
 | 
   574     """
 | 
| 
 | 
   575     Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
 | 
| 
 | 
   576 
 | 
| 
 | 
   577     Args:
 | 
| 
 | 
   578         svg_path : path to SVG file
 | 
| 
 | 
   579         png_path : path for new PNG file
 | 
| 
 | 
   580         dpi : dots per inch of the generated PNG
 | 
| 
 | 
   581         scale : scaling factor for the generated PNG, computed internally when a size is provided
 | 
| 
 | 
   582         size : final effective width of the generated PNG
 | 
| 
 | 
   583 
 | 
| 
 | 
   584     Returns:
 | 
| 
 | 
   585         None
 | 
| 
 | 
   586     """
 | 
| 
 | 
   587     if size:
 | 
| 
 | 
   588         image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
 | 
| 
 | 
   589         scale = size / image.width
 | 
| 
 | 
   590         image = image.resize(scale)
 | 
| 
 | 
   591     else:
 | 
| 
 | 
   592         image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
 | 
| 
 | 
   593 
 | 
| 
 | 
   594     white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
 | 
| 
 | 
   595     white_background = white_background.affine([scale, 0, 0, scale])
 | 
| 
 | 
   596 
 | 
| 
 | 
   597     if white_background.bands != image.bands:
 | 
| 
 | 
   598         white_background = white_background.extract_band(0)
 | 
| 
 | 
   599 
 | 
| 
 | 
   600     composite_image = white_background.composite2(image, 'over')
 | 
| 
 | 
   601     composite_image.write_to_file(png_path.show())
 | 
| 
 | 
   602 
 | 
| 
 | 
   603 #funzione unica, lascio fuori i file e li passo in input
 | 
| 
 | 
   604 #conversion from png to pdf
 | 
| 
 | 
   605 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
 | 
| 
 | 
   606     """
 | 
| 
 | 
   607     Internal utility to convert a PNG to PDF to aid from SVG conversion.
 | 
| 
 | 
   608 
 | 
| 
 | 
   609     Args:
 | 
| 
 | 
   610         png_file : path to PNG file
 | 
| 
 | 
   611         pdf_file : path to new PDF file
 | 
| 
 | 
   612 
 | 
| 
 | 
   613     Returns:
 | 
| 
 | 
   614         None
 | 
| 
 | 
   615     """
 | 
| 
 | 
   616     image = Image.open(png_file.show())
 | 
| 
 | 
   617     image = image.convert("RGB")
 | 
| 
 | 
   618     image.save(pdf_file.show(), "PDF", resolution=100.0)
 | 
| 
 | 
   619 
 | 
| 
 | 
   620 #function called to reduce redundancy in the code
 | 
| 
 | 
   621 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
 | 
| 
 | 
   622     """
 | 
| 
 | 
   623     Converts the SVG map at the provided path to PDF.
 | 
| 
 | 
   624 
 | 
| 
 | 
   625     Args:
 | 
| 
 | 
   626         file_svg : path to SVG file
 | 
| 
 | 
   627         file_png : path to PNG file
 | 
| 
 | 
   628         file_pdf : path to new PDF file
 | 
| 
 | 
   629 
 | 
| 
 | 
   630     Returns:
 | 
| 
 | 
   631         None
 | 
| 
 | 
   632     """
 | 
| 
 | 
   633     svg_to_png_with_background(file_svg, file_png)
 | 
| 
 | 
   634     try:
 | 
| 
 | 
   635         convert_png_to_pdf(file_png, file_pdf)
 | 
| 
 | 
   636         print(f'PDF file {file_pdf.filePath} successfully generated.')
 | 
| 
 | 
   637     
 | 
| 
 | 
   638     except Exception as e:
 | 
| 
 | 
   639         raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
 | 
| 
 | 
   640 
 | 
| 
 | 
   641 ############################ map ##############################################
 | 
| 
 | 
   642 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
 | 
| 
 | 
   643     """
 | 
| 
 | 
   644     Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
 | 
| 
 | 
   645     "result/" folder, used by this tool for output files in collections.
 | 
| 
 | 
   646 
 | 
| 
 | 
   647     Args:
 | 
| 
 | 
   648         dataset1Name : _description_
 | 
| 
 | 
   649         dataset2Name : _description_. Defaults to "rest".
 | 
| 
 | 
   650         details : _description_
 | 
| 
 | 
   651         ext : _description_
 | 
| 
 | 
   652 
 | 
| 
 | 
   653     Returns:
 | 
| 
 | 
   654         utils.FilePath : _description_
 | 
| 
 | 
   655     """
 | 
| 
 | 
   656     # This function returns a util data structure but is extremely specific to this module.
 | 
| 
 | 
   657     # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
 | 
| 
 | 
   658     # TODO: until a third tool with multiple outputs appears before porting this to utils.
 | 
| 
 | 
   659     return utils.FilePath(
 | 
| 
 | 
   660         f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
 | 
| 
 | 
   661         # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
 | 
| 
 | 
   662         # all output files: I don't care, this was never the performance bottleneck of the tool and
 | 
| 
 | 
   663         # there is no other net gain in saving and re-using the built string.
 | 
| 
 | 
   664         ext,
 | 
| 
146
 | 
   665         prefix = ARGS.output_path)
 | 
| 
4
 | 
   666 
 | 
| 
 | 
   667 FIELD_NOT_AVAILABLE = '/'
 | 
| 
 | 
   668 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
 | 
| 
 | 
   669     fieldsAmt = len(fieldNames)
 | 
| 
 | 
   670     with open(outPath.show(), "w", newline = "") as fd:
 | 
| 
 | 
   671         writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
 | 
| 
 | 
   672         writer.writeheader()
 | 
| 
 | 
   673         
 | 
| 
 | 
   674         for row in rows:
 | 
| 
 | 
   675             sizeMismatch = fieldsAmt - len(row)
 | 
| 
 | 
   676             if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
 | 
| 
 | 
   677             writer.writerow({ field : data for field, data in zip(fieldNames, row) })
 | 
| 
 | 
   678 
 | 
| 
 | 
   679 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
 | 
| 
 | 
   680 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None:
 | 
| 
279
 | 
   681     fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"]
 | 
| 
4
 | 
   682 
 | 
| 
 | 
   683     writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
 | 
| 
 | 
   684 
 | 
| 
 | 
   685 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
 | 
| 
 | 
   686     # this function compiles the things always in common between comparison modes after enrichment.
 | 
| 
 | 
   687     # TODO: organize, name better.
 | 
| 
278
 | 
   688     suffix = "RAS" if ras_enrichment else "RPS"
 | 
| 
 | 
   689     details = f"Tabular Result ({suffix})"
 | 
| 
 | 
   690 
 | 
| 
 | 
   691     writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV))
 | 
| 
4
 | 
   692     
 | 
| 
 | 
   693     if ras_enrichment:
 | 
| 
 | 
   694         fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score)
 | 
| 
 | 
   695         return
 | 
| 
 | 
   696 
 | 
| 
 | 
   697     for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
 | 
| 
 | 
   698     applyRpsEnrichmentToMap(tmp, core_map, max_z_score)
 | 
| 
 | 
   699 
 | 
| 
 | 
   700 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
 | 
| 
 | 
   701     """
 | 
| 
 | 
   702     Computes the statistical significance score (P-value) of the comparison between coherent data
 | 
| 
 | 
   703     from two datasets. The data is supposed to, in both datasets:
 | 
| 
 | 
   704     - be related to the same reaction ID;
 | 
| 
 | 
   705     - be ordered by sample, such that the item at position i in both lists is related to the
 | 
| 
 | 
   706       same sample or cell line.
 | 
| 
 | 
   707 
 | 
| 
 | 
   708     Args:
 | 
| 
 | 
   709         dataset1Data : data from the 1st dataset.
 | 
| 
 | 
   710         dataset2Data : data from the 2nd dataset.
 | 
| 
 | 
   711 
 | 
| 
 | 
   712     Returns:
 | 
| 
 | 
   713         tuple: (P-value, Z-score)
 | 
| 
 | 
   714             - P-value from a Kolmogorov-Smirnov test on the provided data.
 | 
| 
 | 
   715             - Z-score of the difference between means of the two datasets.
 | 
| 
 | 
   716     """
 | 
| 
 | 
   717     # Perform Kolmogorov-Smirnov test
 | 
| 
 | 
   718     ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
 | 
| 
 | 
   719     
 | 
| 
 | 
   720     # Calculate means and standard deviations
 | 
| 
 | 
   721     mean1 = np.mean(dataset1Data)
 | 
| 
 | 
   722     mean2 = np.mean(dataset2Data)
 | 
| 
 | 
   723     std1 = np.std(dataset1Data, ddof=1)
 | 
| 
 | 
   724     std2 = np.std(dataset2Data, ddof=1)
 | 
| 
 | 
   725     
 | 
| 
 | 
   726     n1 = len(dataset1Data)
 | 
| 
 | 
   727     n2 = len(dataset2Data)
 | 
| 
 | 
   728     
 | 
| 
 | 
   729     # Calculate Z-score
 | 
| 
 | 
   730     z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
 | 
| 
 | 
   731     
 | 
| 
 | 
   732     return p_value, z_score
 | 
| 
 | 
   733 
 | 
| 
 | 
   734 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
 | 
| 
 | 
   735     #TODO: the following code still suffers from "dumbvarnames-osis"
 | 
| 
 | 
   736     tmp :Dict[str, List[Union[float, FoldChange]]] = {}
 | 
| 
 | 
   737     count   = 0
 | 
| 
 | 
   738     max_z_score = 0
 | 
| 
 | 
   739 
 | 
| 
 | 
   740     for l1, l2 in zip(dataset1Data, dataset2Data):
 | 
| 
 | 
   741         reactId = ids[count]
 | 
| 
 | 
   742         count += 1
 | 
| 
 | 
   743         if not reactId: continue # we skip ids that have already been processed
 | 
| 
 | 
   744 
 | 
| 
 | 
   745         try: #TODO: identify the source of these errors and minimize code in the try block
 | 
| 
 | 
   746             reactDir = ReactionDirection.fromReactionId(reactId)
 | 
| 
 | 
   747             # Net score is computed only for reversible reactions when user wants it on arrow tips or when RAS datasets aren't used
 | 
| 
 | 
   748             if (ARGS.net or not ARGS.using_RAS) and reactDir is not ReactionDirection.Unknown:
 | 
| 
 | 
   749                 try: position = ids.index(reactId[:-1] + ('B' if reactDir is ReactionDirection.Direct else 'F'))
 | 
| 
 | 
   750                 except ValueError: continue # we look for the complementary id, if not found we skip
 | 
| 
 | 
   751 
 | 
| 
 | 
   752                 nets1 = np.subtract(l1, dataset1Data[position])
 | 
| 
 | 
   753                 nets2 = np.subtract(l2, dataset2Data[position])
 | 
| 
 | 
   754 
 | 
| 
 | 
   755                 p_value, z_score = computePValue(nets1, nets2)
 | 
| 
 | 
   756                 avg1 = sum(nets1)   / len(nets1)
 | 
| 
 | 
   757                 avg2 = sum(nets2)   / len(nets2)
 | 
| 
 | 
   758                 net = fold_change(avg1, avg2)
 | 
| 
 | 
   759                 
 | 
| 
 | 
   760                 if math.isnan(net): continue
 | 
| 
 | 
   761                 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
 | 
| 
 | 
   762                 
 | 
| 
 | 
   763                 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
 | 
| 
 | 
   764                 if ARGS.net:
 | 
| 
 | 
   765                     ids[position] = None
 | 
| 
 | 
   766                     continue
 | 
| 
 | 
   767 
 | 
| 
 | 
   768             # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
 | 
| 
 | 
   769             p_value, z_score = computePValue(l1, l2)
 | 
| 
 | 
   770             avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
 | 
| 
 | 
   771             if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
 | 
| 
277
 | 
   772             tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)]
 | 
| 
4
 | 
   773         
 | 
| 
 | 
   774         except (TypeError, ZeroDivisionError): continue
 | 
| 
 | 
   775     
 | 
| 
 | 
   776     return tmp, max_z_score
 | 
| 
 | 
   777 
 | 
| 
151
 | 
   778 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]:
 | 
| 
4
 | 
   779     """
 | 
| 
 | 
   780     Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
 | 
| 
 | 
   781     provided metabolic map.
 | 
| 
 | 
   782 
 | 
| 
 | 
   783     Args:
 | 
| 
 | 
   784         class_pat : the clustered data.
 | 
| 
 | 
   785         ids : ids for data association.
 | 
| 
 | 
   786         fromRAS : whether the data to enrich consists of RAS scores.
 | 
| 
 | 
   787 
 | 
| 
 | 
   788     Returns:
 | 
| 
143
 | 
   789         List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
 | 
| 
 | 
   790         
 | 
| 
4
 | 
   791     Raises:
 | 
| 
 | 
   792         sys.exit : if there are less than 2 classes for comparison
 | 
| 
 | 
   793     """
 | 
| 
143
 | 
   794     class_pat = {k.strip(): v for k, v in class_pat.items()}
 | 
| 
 | 
   795     if (not class_pat) or (len(class_pat.keys()) < 2):
 | 
| 
 | 
   796         sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
 | 
| 
 | 
   797     
 | 
| 
 | 
   798     enrichment_results = []
 | 
| 
4
 | 
   799 
 | 
| 
 | 
   800     if ARGS.comparison == "manyvsmany":
 | 
| 
 | 
   801         for i, j in it.combinations(class_pat.keys(), 2):
 | 
| 
 | 
   802             comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
 | 
| 
143
 | 
   803             enrichment_results.append((i, j, comparisonDict, max_z_score))
 | 
| 
4
 | 
   804     
 | 
| 
 | 
   805     elif ARGS.comparison == "onevsrest":
 | 
| 
 | 
   806         for single_cluster in class_pat.keys():
 | 
| 
143
 | 
   807             rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
 | 
| 
4
 | 
   808             comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
 | 
| 
143
 | 
   809             enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
 | 
| 
4
 | 
   810     
 | 
| 
 | 
   811     elif ARGS.comparison == "onevsmany":
 | 
| 
 | 
   812         controlItems = class_pat.get(ARGS.control)
 | 
| 
 | 
   813         for otherDataset in class_pat.keys():
 | 
| 
143
 | 
   814             if otherDataset == ARGS.control:
 | 
| 
 | 
   815                 continue
 | 
| 
4
 | 
   816             comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
 | 
| 
143
 | 
   817             enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
 | 
| 
 | 
   818     
 | 
| 
 | 
   819     return enrichment_results
 | 
| 
4
 | 
   820 
 | 
| 
143
 | 
   821 def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None:
 | 
| 
 | 
   822     svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
 | 
| 
4
 | 
   823     utils.writeSvg(svgFilePath, core_map)
 | 
| 
 | 
   824 
 | 
| 
 | 
   825     if ARGS.generate_pdf:
 | 
| 
143
 | 
   826         pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
 | 
| 
 | 
   827         pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
 | 
| 
 | 
   828         convert_to_pdf(svgFilePath, pngPath, pdfPath)
 | 
| 
4
 | 
   829 
 | 
| 
143
 | 
   830     if not ARGS.generate_svg:
 | 
| 
145
 | 
   831         os.remove(svgFilePath.show())
 | 
| 
4
 | 
   832 
 | 
| 
 | 
   833 ClassPat = Dict[str, List[List[float]]]
 | 
| 
 | 
   834 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
 | 
| 
 | 
   835     # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
 | 
| 
 | 
   836     # for the sake of everyone's sanity.
 | 
| 
 | 
   837     class_pat :ClassPat = {}
 | 
| 
 | 
   838     if ARGS.option == 'datasets':
 | 
| 
 | 
   839         num = 1 #TODO: the dataset naming function could be a generator
 | 
| 
 | 
   840         for path, name in zip(datasetsPaths, names):
 | 
| 
 | 
   841             name = name_dataset(name, num)
 | 
| 
 | 
   842             resolve_rules_float, ids = getDatasetValues(path, name)
 | 
| 
 | 
   843             if resolve_rules_float != None:
 | 
| 
 | 
   844                 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
 | 
| 
 | 
   845         
 | 
| 
 | 
   846             num += 1
 | 
| 
 | 
   847     
 | 
| 
 | 
   848     elif ARGS.option == "dataset_class":
 | 
| 
 | 
   849         classes = read_dataset(classPath, "class")
 | 
| 
 | 
   850         classes = classes.astype(str)
 | 
| 
 | 
   851 
 | 
| 
 | 
   852         resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
 | 
| 
 | 
   853         if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
 | 
| 
 | 
   854     
 | 
| 
 | 
   855     return ids, class_pat
 | 
| 
 | 
   856     #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
 | 
| 
 | 
   857 
 | 
| 
 | 
   858 #TODO: create these damn args as FilePath objects
 | 
| 
 | 
   859 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
 | 
| 
 | 
   860     """
 | 
| 
 | 
   861     Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
 | 
| 
 | 
   862 
 | 
| 
 | 
   863     Args:
 | 
| 
 | 
   864         datasetPath : path to the dataset
 | 
| 
 | 
   865         datasetName (str): dataset name, used in error reporting
 | 
| 
 | 
   866 
 | 
| 
 | 
   867     Returns:
 | 
| 
 | 
   868         Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
 | 
| 
 | 
   869     """
 | 
| 
 | 
   870     dataset = read_dataset(datasetPath, datasetName)
 | 
| 
 | 
   871     IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
 | 
| 
 | 
   872 
 | 
| 
 | 
   873     dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
 | 
| 
 | 
   874     return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
 | 
| 
 | 
   875 
 | 
| 
 | 
   876 ############################ MAIN #############################################
 | 
| 
147
 | 
   877 def main(args:List[str] = None) -> None:
 | 
| 
4
 | 
   878     """
 | 
| 
 | 
   879     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   880 
 | 
| 
 | 
   881     Returns:
 | 
| 
 | 
   882         None
 | 
| 
 | 
   883     
 | 
| 
 | 
   884     Raises:
 | 
| 
 | 
   885         sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
 | 
| 
 | 
   886     """
 | 
| 
 | 
   887     global ARGS
 | 
| 
146
 | 
   888     ARGS = process_args(args)
 | 
| 
4
 | 
   889 
 | 
| 
146
 | 
   890     if not os.path.isdir(ARGS.output_path):
 | 
| 
 | 
   891         os.makedirs(ARGS.output_path)
 | 
| 
4
 | 
   892     
 | 
| 
143
 | 
   893     core_map: ET.ElementTree = ARGS.choice_map.getMap(
 | 
| 
4
 | 
   894         ARGS.tool_dir,
 | 
| 
 | 
   895         utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
 | 
| 
143
 | 
   896     
 | 
| 
284
 | 
   897         # Prepare enrichment results containers
 | 
| 
 | 
   898     ras_results = []
 | 
| 
 | 
   899     rps_results = []
 | 
| 
 | 
   900 
 | 
| 
 | 
   901     # Compute RAS enrichment if requested
 | 
| 
4
 | 
   902     if ARGS.using_RAS:
 | 
| 
284
 | 
   903         ids_ras, class_pat_ras = getClassesAndIdsFromDatasets(
 | 
| 
 | 
   904             ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
 | 
| 
 | 
   905         ras_results = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True)
 | 
| 
 | 
   906 
 | 
| 
 | 
   907     # Compute RPS enrichment if requested
 | 
| 
4
 | 
   908     if ARGS.using_RPS:
 | 
| 
284
 | 
   909         ids_rps, class_pat_rps = getClassesAndIdsFromDatasets(
 | 
| 
 | 
   910             ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
 | 
| 
 | 
   911         rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False)
 | 
| 
 | 
   912 
 | 
| 
 | 
   913     # Organize by comparison pairs
 | 
| 
 | 
   914     comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {}
 | 
| 
 | 
   915     for i, j, d, mz in ras_results:
 | 
| 
 | 
   916         comparisons[(i, j)] = {'ras': (d, mz), 'rps': None}
 | 
| 
 | 
   917     for i, j, d, mz in rps_results:
 | 
| 
 | 
   918         comparisons.setdefault((i, j), {}).update({'rps': (d, mz)})
 | 
| 
4
 | 
   919 
 | 
| 
284
 | 
   920     # For each comparison, create a styled map with RAS bodies and RPS heads
 | 
| 
 | 
   921     for (i, j), res in comparisons.items():
 | 
| 
 | 
   922         map_copy = copy.deepcopy(core_map)
 | 
| 
 | 
   923 
 | 
| 
 | 
   924         # Apply RAS styling to arrow bodies
 | 
| 
 | 
   925         if res.get('ras'):
 | 
| 
 | 
   926             tmp_ras, max_z_ras = res['ras']
 | 
| 
 | 
   927             temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True)
 | 
| 
 | 
   928 
 | 
| 
 | 
   929         # Apply RPS styling to arrow heads
 | 
| 
 | 
   930         if res.get('rps'):
 | 
| 
 | 
   931             tmp_rps, max_z_rps = res['rps']
 | 
| 
 | 
   932             # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed
 | 
| 
285
 | 
   933             temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False)
 | 
| 
284
 | 
   934 
 | 
| 
 | 
   935         # Output both SVG and PDF/PNG as configured
 | 
| 
 | 
   936         createOutputMaps(i, j, map_copy)
 | 
| 
143
 | 
   937     print('Execution succeeded')
 | 
| 
4
 | 
   938 ###############################################################################
 | 
| 
 | 
   939 if __name__ == "__main__":
 | 
| 
277
 | 
   940     main()
 |