view tabular_label_convert/tabular_label_convert.py @ 0:1f93906c2945 draft default tip

Uploaded
author kellrott
date Sun, 18 Nov 2012 01:42:40 -0500
parents
children
line wrap: on
line source

#!/usr/bin/env python

import os
import csv
import sys
import array
import math
from copy import copy
from argparse import ArgumentParser


class FloatMatrix:
    def __init__(self):
        self.corner_name = "probe"
        self.data = None
        self.nrows = None
        self.ncols = None
        self.rowmap = None
        self.colmap = None

    def read(self, handle):
        header = None
        for line in handle:
            row = line.rstrip().split("\t")
            if header is None:
                header = row
                self.data = array.array("f")
                self.colmap = {}
                self.rowmap = {}
                self.ncols = len(row) - 1
                self.nrows = 0
                for i, c in enumerate(row[1:]):
                    self.colmap[c] = i
            else:
                if len(row) - 1 != self.ncols:
                    raise DataException("Misformed matrix")
                self.rowmap[row[0]] = len(self.rowmap)
                a = []
                for v in row[1:]:
                    try:
                        a.append(float(v))
                    except ValueError:
                        a.append(float('Nan'))
                self.data.extend(a)
                self.nrows += 1

    def init_blank(self, rows, cols):
        self.data = array.array("f")
        self.colmap = {}
        for i,c in enumerate(cols):
            self.colmap[c] = i
        self.rowmap = {}
        for i,r in enumerate(rows):
            self.rowmap[r] = i
        self.ncols = len(cols)
        self.nrows = len(rows)
        for i in range(self.nrows):
            self.data.extend([float('nan')] * self.ncols)

    def get_value(self, row_name, col_name):
        return self.data[ self.rowmap[row_name] * self.ncols + self.colmap[col_name] ]

    def set_value(self, row_name, col_name, value):
        self.data[ self.rowmap[row_name] * self.ncols + self.colmap[col_name] ] = value
    
    def get_row(self, row_name):
        return self.data[ self.rowmap[row_name] * self.ncols :  (self.rowmap[row_name]+1) * self.ncols ]

    def get_cols(self):
        out = self.colmap.keys()
        return sorted(out, key=self.colmap.get)
    
    def has_row(self, row):
        return row in self.rowmap 

    def has_col(self, col):
        return col in self.colmap 

    def get_rows(self):
        out = self.rowmap.keys()
        return sorted(out, key=self.rowmap.get)
    
    def write(self, handle, missing='NA'):
        write = csv.writer(handle, delimiter="\t", lineterminator='\n')
        col_list = self.get_cols()
        
        write.writerow([self.corner_name] + col_list)
        for rowName in self.rowmap:
            out = [rowName]
            row = self.get_row(rowName)
            for col in col_list:
                val = row[self.colmap[col]]
                if val is None or math.isnan(val):
                    val = missing
                else:
                    val = "%.5f" % (val)
                out.append(val)
            write.writerow(out)        
            

def median(inList):
    """calculates median"""
    cList = copy(inList)
    if len(cList) == 0:
        median = float("nan")
    elif len(cList) == 1:
        return cList[0]
    else:
        cList.sort()
        if len(cList)%2 == 1:
            median = cList[len(cList)/2]
        else:
            median = (cList[len(cList)/2]+cList[(len(cList)/2)-1])/2.0
    return (median)

def mean(inList):
    return sum(inList) / float(len(inList))

def aliasRemap(inputMatrix, aliasMap, mode, combine_func):
    """
    Given a inputMatrix and an alias map, create a new genomic matrix 
    with the probes from the original matrix remapped to the connected aliases
    from the map
    """
    
    if mode == "row":
        i_am = {}
        for label in aliasMap:
            if inputMatrix.has_row(label):
                for alias in aliasMap[label]:
                    if alias not in i_am:
                        i_am[alias] = {}
                    i_am[alias][label] = True
            
        out = FloatMatrix()
        out.init_blank( rows=i_am.keys(), cols=inputMatrix.get_cols() )
        for a in i_am:
            for sample in inputMatrix.get_cols():
                o = []
                for p in i_am[a]:
                    if inputMatrix.has_row(p):
                        o.append( inputMatrix.get_value( col_name=sample, row_name=p) )
                if len(o):
                    out.set_value(col_name=sample, row_name=a, value=combine_func(o))
        return out

    if mode == "col":
        i_am = {}
        for label in aliasMap:
            if inputMatrix.has_col(label):
                for alias in aliasMap[label]:
                    if alias not in i_am:
                        i_am[alias] = {}
                    i_am[alias][label] = True
            
        out = FloatMatrix()
        out.init_blank( cols=i_am.keys(), rows=inputMatrix.get_rows() )
        for a in i_am:
            for r in inputMatrix.get_rows():
                o = []
                for label in i_am[a]:
                    if inputMatrix.has_col(label):
                        o.append( inputMatrix.get_value( row_name=r, col_name=label) )
                if len(o):
                    out.set_value(col_name=a, row_name=r, value=combine_func(o))
        return out


combine_map = {
    "mean" : mean,
    "median" : median,
    "max" : max,
    "min" : min
}

if __name__ == "__main__":
    parser = ArgumentParser()
    
    parser.add_argument("-m", "--mode", dest="mode", help="Row/Column mode", default="row")
    parser.add_argument("-c", "--combine", dest="combine", help="Value Combine Method", default="mean")
    parser.add_argument("-o", "--output", help="Output file", default=None)
    parser.add_argument("inTab", help="Input tabular file", default=None)
    parser.add_argument("aliasMap", help="Input alias map", default=None)
    
    args = parser.parse_args()

    mtx = FloatMatrix()
    handle = open(args.inTab)
    mtx.read(handle)
    handle.close()
    
    aliasMap = {}
    handle = open(args.aliasMap)
    for line in handle:
        tmp = line.rstrip().split("\t")
        if tmp[0] not in aliasMap:
            aliasMap[tmp[0]] = {tmp[1] : True}
        else:
            aliasMap[tmp[0]][tmp[1]] = True
    handle.close()
    
    out = aliasRemap(mtx, aliasMap, args.mode, combine_map[args.combine])
    if args.output is None:
        handle = sys.stdout
    else:
        handle = open(args.output, "w")
    out.write(handle)
    handle.close()