Mercurial > repos > yufei-luo > s_mart
view SMART/Java/Python/misc/Utils.py @ 13:03045debed6e
Uploaded
author | m-zytnicki |
---|---|
date | Wed, 17 Apr 2013 10:39:35 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
# # Copyright INRA-URGI 2009-2010 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # """Some useful functions""" import sys, os import random import subprocess def writeFile(fileName, content): """ Write the content of a file """ handle = open(fileName, "w") handle.write(content) handle.close() def sumOfLists(list1, list2): """ Element by element sum """ if len(list1) != len(list2): sys.exit("Cannot sum list whose sizes are different!") return [list1[i] + list2[i] for i in range(len(list1))] def protectBackslashes(string): """ Protect the backslashes in a path by adding another backslash """ return string.replace("\\", "\\\\") def getHammingDistance(string1, string2): """ Compute Hamming distance between two strings """ if len(string1) != len(string2): raise Exception("Error, size of %s and %s differ" % (string1, string2)) return sum(ch1 != ch2 for ch1, ch2 in zip(string1, string2)) def getLevenshteinDistance(string1, string2): """ Compute Levenshtein distance between two strings """ if len(string1) < len(string2): return getLevenshteinDistance(string2, string1) if not string1: return len(string2) previousRow = xrange(len(string2) + 1) for i, c1 in enumerate(string1): currentRow = [i + 1] for j, c2 in enumerate(string2): insertions = previousRow[j + 1] + 1 deletions = currentRow[j] + 1 substitutions = previousRow[j] + (c1 != c2) currentRow.append(min(insertions, deletions, substitutions)) previousRow = currentRow return previousRow[-1] def getMinAvgMedMax(values): """ Get some stats about a dict @param values: a distribution (the value being the number of occurrences of the key) @type values: dict int to int @return: a tuple """ minValues = min(values.keys()) maxValues = max(values.keys()) sumValues = sum([value * values[value] for value in values]) nbValues = sum(values.values()) allValues = [] for key in values: for i in range(values[key]): allValues.append(key) sortedValues = sorted(allValues) sorted(values.values()) if (nbValues % 2 == 0): medValues = (sortedValues[nbValues / 2 - 1] + sortedValues[nbValues / 2]) / 2.0 else: medValues = sortedValues[(nbValues + 1) / 2 - 1] return (minValues, float(sumValues) / nbValues, medValues, maxValues) def xor(value1, value2): """ Logical xor @param value1: a value @type value1: anything @param value2: a value @type value2: anything """ return bool(value1) != bool(value2) def diff(fileName1, fileName2): """ Compare two files @param fileName1: a file name @type fileName1: string @param fileName2: another file name @type fileName2: string @return: None if the files are the same, a string otherwise """ handle1 = open(fileName1) lines1 = handle1.readlines() handle2 = open(fileName2) lines2 = handle2.readlines() if len(lines1) != len(lines2): print "Sizes of files differ (%d != %d)" % (len(lines1), len(lines2)) return False for i in xrange(len(lines1)): if lines1[i] != lines2[i]: print "Line %d differ ('%s' != '%s')" % (i, lines1[i].strip(), lines2[i].strip()) return False return True def binomialCoefficient(a, b): """ Compute cumulated product from a to b @param a: a value @type a: int @param b: a value @type b: int """ if a > b / 2: a = b-a p = 1.0 for i in range(b-a+1, b+1): p *= i q = 1.0 for i in range(1, a+1): q *= i return p / q memory = {} # def fisherExactPValue(a, b, c, d): # """ # P-value of Fisher exact test for 2x2 contingency table # """ # if (a, b, c, d) in memory: # return memory[(a, b, c, d)] # n = a + b + c + d # i1 = binomialCoefficient(a, a+b) # i2 = binomialCoefficient(c, a+c) # i3 = binomialCoefficient(c+d, n) # pValue = i1 * i2 / i3 # memory[(a, b, c, d)] = pValue # return pValue def fisherExactPValue(a, b, c, d): if (a, b, c, d) in memory: return memory[(a, b, c, d)] scriptFileName = "tmpScript-%d.R" % (random.randint(0, 10000)) rScript = open(scriptFileName, "w") rScript.write("data = matrix(c(%d, %d, %d, %d), nr=2)\n" % (a, b, c, d)) rScript.write("fisher.test(data)\n") #rScript.write("chisq.test(data)\n") rScript.close() rCommand = "R" if "SMARTRPATH" in os.environ: rCommand = os.environ["SMARTRPATH"] command = "\"%s\" CMD BATCH %s" % (rCommand, scriptFileName) status = subprocess.call(command, shell=True) if status != 0: sys.exit("Problem with the execution of script file %s, status is: %s" % (scriptFileName, status)) outputRFileName = "%sout" % (scriptFileName) outputRFile = open(outputRFileName) pValue = None pValueTag = "p-value " for line in outputRFile: line = line.strip() if line == "": continue for splittedLine in line.split(","): splittedLine = splittedLine.strip() if splittedLine.startswith(pValueTag): pValue = float(splittedLine.split()[-1]) break if pValue == None: sys.exit("Problem with the cannot find p-value! File %s, values are: %d, %d, %d, %d" % (scriptFileName, a, b, c, d)) os.remove(scriptFileName) os.remove(outputRFileName) memory[(a, b, c, d)] = pValue return pValue def fisherExactPValueBulk(list): scriptFileName = "tmpScript-%d.R" % (random.randint(0, 10000)) rScript = open(scriptFileName, "w") for element in list: rScript.write("fisher.test(matrix(c(%d, %d, %d, %d), nr=2))$p.value\n" % (int(element[0]), int(element[1]), int(element[2]), int(element[3]))) rScript.close() rCommand = "R" if "SMARTRPATH" in os.environ: rCommand = os.environ["SMARTRPATH"] command = "\"%s\" CMD BATCH %s" % (rCommand, scriptFileName) status = subprocess.call(command, shell=True) if status != 0: sys.exit("Problem with the execution of script file %s, status is: %s" % (scriptFileName, status)) outputRFileName = "%sout" % (scriptFileName) outputRFile = open(outputRFileName) pValue = None pValueTag = "[1] " results = {} cpt = 0 for line in outputRFile: line = line.strip() if line == "": continue if line.startswith(pValueTag): pValue = float(line.split()[-1]) results[list[cpt][0:2]] = pValue cpt += 1 if pValue == None: sys.exit("Problem with the cannot find p-value!") if cpt != len(list): sys.exit("Error in the number of p-values computed by R in file '%s'!" % (scriptFileName)) os.remove(scriptFileName) os.remove(outputRFileName) return results