Mercurial > repos > kellrott > tabular_label_convert
view tabular_label_convert/tabular_label_convert.py @ 0:1f93906c2945 draft default tip
Uploaded
author | kellrott |
---|---|
date | Sun, 18 Nov 2012 01:42:40 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python import os import csv import sys import array import math from copy import copy from argparse import ArgumentParser class FloatMatrix: def __init__(self): self.corner_name = "probe" self.data = None self.nrows = None self.ncols = None self.rowmap = None self.colmap = None def read(self, handle): header = None for line in handle: row = line.rstrip().split("\t") if header is None: header = row self.data = array.array("f") self.colmap = {} self.rowmap = {} self.ncols = len(row) - 1 self.nrows = 0 for i, c in enumerate(row[1:]): self.colmap[c] = i else: if len(row) - 1 != self.ncols: raise DataException("Misformed matrix") self.rowmap[row[0]] = len(self.rowmap) a = [] for v in row[1:]: try: a.append(float(v)) except ValueError: a.append(float('Nan')) self.data.extend(a) self.nrows += 1 def init_blank(self, rows, cols): self.data = array.array("f") self.colmap = {} for i,c in enumerate(cols): self.colmap[c] = i self.rowmap = {} for i,r in enumerate(rows): self.rowmap[r] = i self.ncols = len(cols) self.nrows = len(rows) for i in range(self.nrows): self.data.extend([float('nan')] * self.ncols) def get_value(self, row_name, col_name): return self.data[ self.rowmap[row_name] * self.ncols + self.colmap[col_name] ] def set_value(self, row_name, col_name, value): self.data[ self.rowmap[row_name] * self.ncols + self.colmap[col_name] ] = value def get_row(self, row_name): return self.data[ self.rowmap[row_name] * self.ncols : (self.rowmap[row_name]+1) * self.ncols ] def get_cols(self): out = self.colmap.keys() return sorted(out, key=self.colmap.get) def has_row(self, row): return row in self.rowmap def has_col(self, col): return col in self.colmap def get_rows(self): out = self.rowmap.keys() return sorted(out, key=self.rowmap.get) def write(self, handle, missing='NA'): write = csv.writer(handle, delimiter="\t", lineterminator='\n') col_list = self.get_cols() write.writerow([self.corner_name] + col_list) for rowName in self.rowmap: out = [rowName] row = self.get_row(rowName) for col in col_list: val = row[self.colmap[col]] if val is None or math.isnan(val): val = missing else: val = "%.5f" % (val) out.append(val) write.writerow(out) def median(inList): """calculates median""" cList = copy(inList) if len(cList) == 0: median = float("nan") elif len(cList) == 1: return cList[0] else: cList.sort() if len(cList)%2 == 1: median = cList[len(cList)/2] else: median = (cList[len(cList)/2]+cList[(len(cList)/2)-1])/2.0 return (median) def mean(inList): return sum(inList) / float(len(inList)) def aliasRemap(inputMatrix, aliasMap, mode, combine_func): """ Given a inputMatrix and an alias map, create a new genomic matrix with the probes from the original matrix remapped to the connected aliases from the map """ if mode == "row": i_am = {} for label in aliasMap: if inputMatrix.has_row(label): for alias in aliasMap[label]: if alias not in i_am: i_am[alias] = {} i_am[alias][label] = True out = FloatMatrix() out.init_blank( rows=i_am.keys(), cols=inputMatrix.get_cols() ) for a in i_am: for sample in inputMatrix.get_cols(): o = [] for p in i_am[a]: if inputMatrix.has_row(p): o.append( inputMatrix.get_value( col_name=sample, row_name=p) ) if len(o): out.set_value(col_name=sample, row_name=a, value=combine_func(o)) return out if mode == "col": i_am = {} for label in aliasMap: if inputMatrix.has_col(label): for alias in aliasMap[label]: if alias not in i_am: i_am[alias] = {} i_am[alias][label] = True out = FloatMatrix() out.init_blank( cols=i_am.keys(), rows=inputMatrix.get_rows() ) for a in i_am: for r in inputMatrix.get_rows(): o = [] for label in i_am[a]: if inputMatrix.has_col(label): o.append( inputMatrix.get_value( row_name=r, col_name=label) ) if len(o): out.set_value(col_name=a, row_name=r, value=combine_func(o)) return out combine_map = { "mean" : mean, "median" : median, "max" : max, "min" : min } if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("-m", "--mode", dest="mode", help="Row/Column mode", default="row") parser.add_argument("-c", "--combine", dest="combine", help="Value Combine Method", default="mean") parser.add_argument("-o", "--output", help="Output file", default=None) parser.add_argument("inTab", help="Input tabular file", default=None) parser.add_argument("aliasMap", help="Input alias map", default=None) args = parser.parse_args() mtx = FloatMatrix() handle = open(args.inTab) mtx.read(handle) handle.close() aliasMap = {} handle = open(args.aliasMap) for line in handle: tmp = line.rstrip().split("\t") if tmp[0] not in aliasMap: aliasMap[tmp[0]] = {tmp[1] : True} else: aliasMap[tmp[0]][tmp[1]] = True handle.close() out = aliasRemap(mtx, aliasMap, args.mode, combine_map[args.combine]) if args.output is None: handle = sys.stdout else: handle = open(args.output, "w") out.write(handle) handle.close()