view SMART/Java/Python/misc/Utils.py @ 13:03045debed6e

Uploaded
author m-zytnicki
date Wed, 17 Apr 2013 10:39:35 -0400
parents 769e306b7933
children
line wrap: on
line source

#
# Copyright INRA-URGI 2009-2010
# 
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
# 
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
# 
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# 
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
#
"""Some useful functions"""

import sys, os
import random
import subprocess


def writeFile(fileName, content):
    """
    Write the content of a file
    """
    handle = open(fileName, "w")
    handle.write(content)
    handle.close()

def sumOfLists(list1, list2):
    """
    Element by element sum
    """
    if len(list1) != len(list2):
        sys.exit("Cannot sum list whose sizes are different!")
    return [list1[i] + list2[i] for i in range(len(list1))]


def protectBackslashes(string):
    """
    Protect the backslashes in a path by adding another backslash
    """
    return string.replace("\\", "\\\\")
    

def getHammingDistance(string1, string2):
    """
    Compute Hamming distance between two strings
    """
    if len(string1) != len(string2):
        raise Exception("Error, size of %s and %s differ" % (string1, string2))
    return sum(ch1 != ch2 for ch1, ch2 in zip(string1, string2))


def getLevenshteinDistance(string1, string2):
    """
    Compute Levenshtein distance between two strings
    """
    if len(string1) < len(string2):
        return getLevenshteinDistance(string2, string1)
    if not string1:
        return len(string2)
    previousRow = xrange(len(string2) + 1)
    for i, c1 in enumerate(string1):
        currentRow = [i + 1]
        for j, c2 in enumerate(string2):
            insertions    = previousRow[j + 1] + 1
            deletions     = currentRow[j] + 1
            substitutions = previousRow[j] + (c1 != c2)
            currentRow.append(min(insertions, deletions, substitutions))
        previousRow = currentRow
    return previousRow[-1]


def getMinAvgMedMax(values):
    """
    Get some stats about a dict
    @param values: a distribution (the value being the number of occurrences of the key)
    @type values: dict int to int
    @return: a tuple
    """
    minValues = min(values.keys())
    maxValues = max(values.keys())
    sumValues = sum([value * values[value] for value in values])
    nbValues = sum(values.values())
    allValues = []
    for key in values:
        for i in range(values[key]):
            allValues.append(key)
    sortedValues = sorted(allValues)
    sorted(values.values())
    if (nbValues % 2 == 0):
        medValues = (sortedValues[nbValues / 2 - 1] + sortedValues[nbValues / 2]) / 2.0
    else:
        medValues = sortedValues[(nbValues + 1) / 2 - 1]
    return (minValues, float(sumValues) / nbValues, medValues, maxValues)


def xor(value1, value2):
    """
    Logical xor
    @param value1: a value
    @type value1: anything
    @param value2: a value
    @type value2: anything
    """
    return bool(value1) != bool(value2)


def diff(fileName1, fileName2):
    """
    Compare two files
    @param fileName1: a file name
    @type fileName1: string
    @param fileName2: another file name
    @type fileName2: string
    @return: None if the files are the same, a string otherwise
    """
    handle1 = open(fileName1)
    lines1 = handle1.readlines()
    handle2 = open(fileName2)
    lines2 = handle2.readlines()
    if len(lines1) != len(lines2):
        print "Sizes of files differ (%d != %d)" % (len(lines1), len(lines2))
        return False
    for i in xrange(len(lines1)):
        if lines1[i] != lines2[i]:
            print "Line %d differ ('%s' != '%s')" % (i, lines1[i].strip(), lines2[i].strip())
            return False
    return True


def binomialCoefficient(a, b):
    """
    Compute cumulated product from a to b
    @param a: a value
    @type    a: int
    @param b: a value
    @type    b: int
    """
    if a > b / 2:
        a = b-a
    p = 1.0
    for i in range(b-a+1, b+1):
        p *= i
    q = 1.0
    for i in range(1, a+1):
        q *= i
    return p / q


memory = {}

# def fisherExactPValue(a, b, c, d):
#     """
#     P-value of Fisher exact test for 2x2 contingency table
#     """
#     if (a, b, c, d) in memory:
#         return memory[(a, b, c, d)]

#     n = a + b + c + d
#     i1 = binomialCoefficient(a, a+b)
#     i2 = binomialCoefficient(c, a+c)
#     i3 = binomialCoefficient(c+d, n)
#     pValue = i1 * i2 / i3

#     memory[(a, b, c, d)] = pValue

#     return pValue
    

def fisherExactPValue(a, b, c, d):
    if (a, b, c, d) in memory:
        return memory[(a, b, c, d)]

    scriptFileName = "tmpScript-%d.R" % (random.randint(0, 10000))
    rScript = open(scriptFileName, "w")
    rScript.write("data = matrix(c(%d, %d, %d, %d), nr=2)\n" % (a, b, c, d))
    rScript.write("fisher.test(data)\n")
    #rScript.write("chisq.test(data)\n")
    rScript.close()

    rCommand = "R"
    if "SMARTRPATH" in os.environ:
        rCommand = os.environ["SMARTRPATH"]
    command = "\"%s\" CMD BATCH %s" % (rCommand, scriptFileName)
    status = subprocess.call(command, shell=True)

    if status != 0:
        sys.exit("Problem with the execution of script file %s, status is: %s" % (scriptFileName, status))

    outputRFileName = "%sout" % (scriptFileName)
    outputRFile = open(outputRFileName)
    pValue = None
    pValueTag = "p-value "
    for line in outputRFile:
        line = line.strip()
        if line == "": continue
        for splittedLine in line.split(","):
            splittedLine = splittedLine.strip()
            if splittedLine.startswith(pValueTag):
                pValue = float(splittedLine.split()[-1])
                break

    if pValue == None:
        sys.exit("Problem with the cannot find p-value! File %s, values are: %d, %d, %d, %d" % (scriptFileName, a, b, c, d))

    os.remove(scriptFileName)
    os.remove(outputRFileName)

    memory[(a, b, c, d)] = pValue

    return pValue


def fisherExactPValueBulk(list):

    scriptFileName = "tmpScript-%d.R" % (random.randint(0, 10000))
    rScript = open(scriptFileName, "w")
    for element in list:
        rScript.write("fisher.test(matrix(c(%d, %d, %d, %d), nr=2))$p.value\n" % (int(element[0]), int(element[1]), int(element[2]), int(element[3])))
    rScript.close()

    rCommand = "R"
    if "SMARTRPATH" in os.environ:
        rCommand = os.environ["SMARTRPATH"]
    command = "\"%s\" CMD BATCH %s" % (rCommand, scriptFileName)
    status = subprocess.call(command, shell=True)

    if status != 0:
        sys.exit("Problem with the execution of script file %s, status is: %s" % (scriptFileName, status))

    outputRFileName = "%sout" % (scriptFileName)
    outputRFile = open(outputRFileName)
    pValue = None
    pValueTag = "[1] "
    results = {}
    cpt = 0
    for line in outputRFile:
        line = line.strip()
        if line == "": continue
        if line.startswith(pValueTag):
            pValue = float(line.split()[-1])
            results[list[cpt][0:2]] = pValue
            cpt += 1

    if pValue == None:
        sys.exit("Problem with the cannot find p-value!")
    if cpt != len(list):
        sys.exit("Error in the number of p-values computed by R in file '%s'!" % (scriptFileName))

    os.remove(scriptFileName)
    os.remove(outputRFileName)

    return results