Mercurial > repos > bimib > marea_2
changeset 165:244e2f302c68 draft
Uploaded
| author | francesco_lapi | 
|---|---|
| date | Wed, 24 Jul 2024 16:13:10 +0000 | 
| parents | ce6b7021c3d1 | 
| children | e19d829063b2 | 
| files | marea_2/marea.py | 
| diffstat | 1 files changed, 48 insertions(+), 29 deletions(-) [+] | 
line wrap: on
 line diff
--- a/marea_2/marea.py Tue Jul 23 14:10:17 2024 +0000 +++ b/marea_2/marea.py Wed Jul 24 16:13:10 2024 +0000 @@ -256,7 +256,7 @@ return ';'.join(tmp) # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! -def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_F_C :float) -> ET.ElementTree: +def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: """ Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. @@ -265,7 +265,7 @@ core_map : SVG map to modify threshold_P_V : threshold for a p-value to be considered significant threshold_F_C : threshold for a fold change value to be considered significant - max_F_C : highest fold change (absolute value) + max_z_score : highest z-score (absolute value) Returns: ET.ElementTree : the modified core_map @@ -276,8 +276,8 @@ maxT = 12 minT = 2 grey = '#BEBEBE' - blue = '#0000FF' - red = '#E41A1C' + blue = '#6495ed' + red = '#ecac68' for el in core_map.iter(): el_id = str(el.get('id')) if el_id.startswith('R_'): @@ -285,6 +285,7 @@ if tmp != None: p_val :float = tmp[0] f_c = tmp[1] + z_score = tmp[2] if p_val < threshold_P_V: if not isinstance(f_c, str): if abs(f_c) < math.log(threshold_F_C, 2): @@ -295,7 +296,7 @@ col = blue elif f_c > 0: col = red - width = str(max((abs(f_c) * maxT) / max_F_C, minT)) + width = str(max((abs(z_score) * maxT) / max_z_score, minT)) else: if f_c == '-INF': col = blue @@ -375,8 +376,8 @@ Encodes possible arrow colors based on their meaning in the enrichment process. """ Invalid = "#BEBEBE" # gray, fold-change under treshold - UpRegulated = "#E41A1C" # red, up-regulated reaction - DownRegulated = "#0000FF" # blue, down-regulated reaction + UpRegulated = "#ecac68" # red, up-regulated reaction + DownRegulated = "#6495ed" # blue, down-regulated reaction UpRegulatedInv = "#FF7A00" # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with @@ -463,14 +464,14 @@ INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) -def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericFoldChange :float) -> None: +def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: """ Applies RPS enrichment results to the provided metabolic map. Args: rpsEnrichmentRes : RPS enrichment results. metabMap : the metabolic map to edit. - maxNumericFoldChange : biggest finite fold-change value found. + maxNumericZScore : biggest finite z-score value found. Side effects: metabMap : mut @@ -481,6 +482,7 @@ for reactionId, values in rpsEnrichmentRes.items(): pValue = values[0] foldChange = values[1] + z_score = values[2] if isinstance(foldChange, str): foldChange = float(foldChange) if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow @@ -493,7 +495,7 @@ width = Arrow.MAX_W if not math.isinf(foldChange): - try: width = max(abs(foldChange * Arrow.MAX_W) / maxNumericFoldChange, Arrow.MIN_W) + try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) except ZeroDivisionError: pass if not reactionId.endswith("_RV"): # RV stands for reversible reactions @@ -669,19 +671,19 @@ writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) -def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_F_C :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: +def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: # this function compiles the things always in common between comparison modes after enrichment. # TODO: organize, name better. writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV)) if ras_enrichment: - fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_F_C) + fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) return for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData) - applyRpsEnrichmentToMap(tmp, core_map, max_F_C) + applyRpsEnrichmentToMap(tmp, core_map, max_z_score) -def computePValue(dataset1Data :List[float], dataset2Data :List[float]) -> float: +def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]: """ Computes the statistical significance score (P-value) of the comparison between coherent data from two datasets. The data is supposed to, in both datasets: @@ -694,15 +696,32 @@ dataset2Data : data from the 2nd dataset. Returns: - float: P-value from a Kolmogorov-Smirnov test on the provided data. + tuple: (P-value, Z-score) + - P-value from a Kolmogorov-Smirnov test on the provided data. + - Z-score of the difference between means of the two datasets. """ - return st.ks_2samp(dataset1Data, dataset2Data)[1] + # Perform Kolmogorov-Smirnov test + ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data) + + # Calculate means and standard deviations + mean1 = np.mean(dataset1Data) + mean2 = np.mean(dataset2Data) + std1 = np.std(dataset1Data, ddof=1) + std2 = np.std(dataset2Data, ddof=1) + + n1 = len(dataset1Data) + n2 = len(dataset2Data) + + # Calculate Z-score + z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2)) + + return p_value, z_score def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: #TODO: the following code still suffers from "dumbvarnames-osis" tmp :Dict[str, List[Union[float, FoldChange]]] = {} count = 0 - max_F_C = 0 + max_z_score = 0 for l1, l2 in zip(dataset1Data, dataset2Data): reactId = ids[count] @@ -719,13 +738,13 @@ nets1 = np.subtract(l1, dataset1Data[position]) nets2 = np.subtract(l2, dataset2Data[position]) - p_value = computePValue(nets1, nets2) + p_value, z_score = computePValue(nets1, nets2) avg1 = sum(nets1) / len(nets1) avg2 = sum(nets2) / len(nets2) net = fold_change(avg1, avg2) if math.isnan(net): continue - tmp[reactId[:-1] + "RV"] = [p_value, net, avg1, avg2] + tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] # vvv complementary directional ids are set to None once processed if net is to be applied to tips if ARGS.net: @@ -733,14 +752,14 @@ continue # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used - p_value = computePValue(l1, l2) + p_value, z_score = computePValue(l1, l2) avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) - if not isinstance(avg, str) and max_F_C < abs(avg): max_F_C = abs(avg) - tmp[reactId] = [float(p_value), avg] + if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) + tmp[reactId] = [float(p_value), avg, z_score] except (TypeError, ZeroDivisionError): continue - return tmp, max_F_C + return tmp, max_z_score def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None: """ @@ -771,8 +790,8 @@ for i, j in it.combinations(class_pat.keys(), 2): #TODO: these 2 functions are always called in pair and in this order and need common data, # some clever refactoring would be appreciated. - comparisonDict, max_F_C = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) - temp_thingsInCommon(comparisonDict, metabMap, max_F_C, i, j, fromRAS) + comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) + temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS) elif ARGS.comparison == "onevsrest": for single_cluster in class_pat.keys(): @@ -785,16 +804,16 @@ for i in t: rest = rest + i - comparisonDict, max_F_C = compareDatasetPair(class_pat.get(single_cluster), rest, ids) - temp_thingsInCommon(comparisonDict, metabMap, max_F_C, single_cluster, fromRAS) + comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) + temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS) elif ARGS.comparison == "onevsmany": controlItems = class_pat.get(ARGS.control) for otherDataset in class_pat.keys(): if otherDataset == ARGS.control: continue - comparisonDict, max_F_C = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) - temp_thingsInCommon(comparisonDict, metabMap, max_F_C, ARGS.control, otherDataset, fromRAS) + comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) + temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS) def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None: svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)
