Mercurial > repos > md-anderson-bioinformatics > matrix_manipulation
view Matrix_Transformations.py @ 1:f1bcd79cd923 draft default tip
Uploaded
author | insilico-bob |
---|---|
date | Tue, 27 Nov 2018 14:20:40 -0500 |
parents | |
children |
line wrap: on
line source
''' Created on Jun 6, 2017 updated Feb 2018 @author: cjacoby and Bob Brown ''' import os import sys, traceback, argparse import numpy as np from numpy import size, array import warnings from Matrix_Validate_import import reader #import scipy.stats as ss warnings.filterwarnings('error') #Define argparse Function def get_args(): parser = argparse.ArgumentParser() parser.add_argument('input_file_txt', help='text file input matrix(include .txt in name)') parser.add_argument('choice', type=str, help='Choose normalization Method: 1 = Z-score, 2 = Mean Centered, 3 = log2, 4= rank') parser.add_argument('axes', type=str, help='Choose Axis to normalize On (Row or Column)') parser.add_argument('scalevalue', help='optional scaling factor for matrix)') parser.add_argument('offsetvalue', help='optional offset for matrix') parser.add_argument('output_file_txt', help='text file output matrix(include .txt in name)') args = parser.parse_args() return args def Zscore_row(matrix): #Loop To Perform Z-Score normalization for i in range(0,len(matrix)): temp_mean = np.nanmean(matrix[i]) temp_stdev = np.nanstd(matrix[i],ddof=1) for j in range(0,len(matrix[0])): matrix[i][j] = (matrix[i][j]-temp_mean)/temp_stdev return(matrix) #Define Z-Score normalization Function def Zscore_col(matrix): #Loop To Perform Z-Score normalization for i in range(len(matrix[0])): # matrix[:][i] = [scaleValue*x+offset for x in matrix[i]] temp_mean = np.nanmean([row[i] for row in matrix]) temp_stdev = np.nanstd([row[i] for row in matrix],ddof=1) #Probably Should Have if statement checking if stdev equals zero, although this implies the data is already Z-score normalized for j in range(len(matrix)): matrix[j][i] = (matrix[j][i]-temp_mean)/temp_stdev return(matrix) #Define Mean Centered or Median centered normalization Function def MeanMedianCenter_row(matrix,type): #Loop To Perform mean or median center for i in range(0,len(matrix)): if type == "mean": temp_type = np.nanmean(matrix[i][1::]) else: temp_type = np.nanmedian(matrix[i][1::]) for j in range(0,len(matrix[0])): matrix[i][j] = (matrix[i][j]-temp_type) return(matrix) #Define mean or median def MeanMedianCenter_col(matrix,type): #Loop To Perform mean or median center for i in range(0,len(matrix[0])): if type == "mean": temp_type = np.nanmean([row[i] for row in matrix]) else: temp_type = np.nanmedian([row[i] for row in matrix]) #Probably Should Have if statement checking if stdev equals zero, although this implies the data is already Z-score normalized for j in range(0,len(matrix)): matrix[j][i] = (matrix[j][i]-temp_type) return(matrix) #Divide by sum of the Row Function def Divide_By_Sum_row(matrix): #Loop To Perform mean or median center numRow,numCol= np.shape(matrix) for i in range(numRow): sumValue = sum(matrix[i][:]) #if equals zero if abs(sumValue) > .0001: for j in range(numCol): matrix[i][j] = matrix[i][j]/sumValue else: print("ERROR Cannot divide by Sum almost zero", str(sumValue), " for Row ",str(i+1)) return(matrix) #Divide by sum of the Column Function def Divide_By_Sum_col(matrix): #Loop To Perform mean or median center numRow,numCol= np.shape(matrix) for i in range(numCol): sumValue= 0 #if equals zero if abs(sumValue) > .0001: for j in range(numRow): matrix[j][i] = (matrix[j][i]/sumValue) else: print("ERROR Cannot divide by Sum almost zero", str(sumValue), " for Column ",str(i+1)) return(matrix) #scale or add offset to matrix by row def ScaleOffset_row(matrix,scaleValue,offset): #Loop To Perform scale and offset do one or the other per request if abs(scaleValue) > 0.0001: for i in range(0,len(matrix)): matrix[i][:] = [scaleValue*x+offset for x in matrix[i]] else: print (" Scale facter "+str(scaleValue)+" too small") return(matrix) #scale or add offset to matrix by column def ScaleOffset_col(matrix,scaleValue,offset): #Loop To Perform scale and offset do one or the other per request if abs(scaleValue) > 0.0001: for i in range(0,len(matrix[0])): matrix[:][i] = [scaleValue*x+offset for x in matrix[i]] else: print (" Scale facter "+str(scaleValue)+" too small") return(matrix) #Define Log2 normalization Method def Convert2Logs(matrix,logValue, offset): import warnings warnings.filterwarnings('error') #Loop To Perform Z-Score normalization for i in range(0,len(matrix)): for j in range(0,len(matrix[0])): try: if logValue == "log2": matrix[i][j] = np.log2(matrix[i][j]+offset) else: matrix[i][j] = np.log10(matrix[i][j]+offset) except RuntimeWarning: print(logValue+" normalization Failed: Encountered elements <= 0, which are invalid inputs for a Log normalization") break else: continue break return(matrix) #transpose matrix def Transpose(in_mat): out_mat = [] numRow,numCol= np.shape(in_mat) for i in range(numCol): temp= [] for j in range(numRow): temp.append(in_mat[j][i]) out_mat.append(temp) #print( str(out_mat)) return out_mat # restores row and column labels in ouput def labeler(matrix,og_cols,og_rows,output_file_txt): #Define Null Sets For Col and Row Headers with open(output_file_txt,'w') as f: f.write("") for k in range(0,len(og_cols)): f.write('\t' + str(og_cols[k]) ) f.write('\n') for i in range(0,len(og_rows)): f.write(str(og_rows[i]) ) for j in range(0,len(matrix[0])): f.write('\t' + format(matrix[i][j])) f.write('\n') #Define Main Function def main(): try: args = get_args() scaleValue = float(args.scalevalue) offsetValue= float(args.offsetvalue) #print(args) #sys.stdout.write(str(args)+"\n") matrix,og_cols,og_rows = reader(args.input_file_txt) if args.choice == "z_score_normalization": if args.axes == "Row": matrix = Zscore_row(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("zcore, row") elif args.axes == "Column": matrix = Zscore_col(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("zscore, column") else: print("zscore, invalid axis") elif args.choice == "mean_center_normalization": if args.axes == "Row": matrix = MeanMedianCenter_row(matrix,"mean") labeler(matrix,og_cols,og_rows,args.output_file_txt) print("mean-center by row") elif args.axes == "Column": matrix = MeanMedianCenter_col(matrix,"mean") labeler(matrix,og_cols,og_rows,args.output_file_txt) print("mean-center by column") else: print("meancenter, invalid axis") elif args.choice == "median_center_normalization": if args.axes == "Row": matrix = MeanMedianCenter_row(matrix,"median") labeler(matrix,og_cols,og_rows,args.output_file_txt) print("median-center by row") elif args.axes == "Column": matrix = MeanMedianCenter_col(matrix,"median") labeler(matrix,og_cols,og_rows,args.output_file_txt) print("median-center by column") else: print("meancenter, invalid axis") elif args.choice == "add_offset": if args.axes == "Row": #offset = -100 #!!!! TODO REMOVE AND ADD WHEN clause to xml to get value matrix = ScaleOffset_row(matrix,1.0,offsetValue) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("offset of "+str(offsetValue)+" by row") elif args.axes == "Column": matrix = ScaleOffset_col(matrix,1.0,offsetValue) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("offset of "+str(offsetValue)+" by column") else: print("offset"+str(offsetValue)+" invalid axis -not row or column") elif args.choice == "scale": if args.axes == "Row": #scaleValue = 1000 #!!!! TODO REMOVE AND ADD WHEN clause to xml to get value matrix = ScaleOffset_row(matrix,scaleValue,0.0) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("scaling "+str(scaleValue)+" by row") elif args.axes == "Column": matrix = ScaleOffset_col(matrix,scaleValue,0.0) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("scaling "+str(scaleValue)+" by column") else: print("scaling "+str(scaleValue)+" invalid axis") elif args.choice == "transpose": matrix = Transpose(matrix) #issue using same matrix? labeler(matrix,og_rows,og_cols,args.output_file_txt) #swapped row&col labels print("transpose mxn matrix to nxm size") elif args.choice == "ln_normalization": matrix = Convert2Logs(matrix,"log2",offsetValue) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("log2 plus "+str(offsetValue)+" normalization for all values") elif args.choice == "log_normalization": matrix = Convert2Logs(matrix,"log10",offsetValue) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("log10 normalization for all values") elif args.choice == "rank": if args.axes == "Row": matrix = Rankdata_ByRow(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("performed rank normalization by row") elif args.axes == "Column": matrix = Rankdata_ByColumn(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("performed rank normalization by column") else: print("rank, invalid axis") elif args.choice == "divide_by_sum": if args.axes == "Row": matrix = Divide_By_Sum_row(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("performed divide row N values by row N's sum") elif args.axes == "Column": matrix = Divide_By_Sum_col(matrix) labeler(matrix,og_cols,og_rows,args.output_file_txt) print("performed divide column N values by column N's sum") else: print("divide_by_sum, invalid axis") else: print("Invalid normalization Choice") except Exception as err: traceback.print_exc() sys.exit(1) if __name__ == '__main__': main() print("Done")