Mercurial > repos > miller-lab > genome_diversity
changeset 33:5064f618ec1c
remove munkres dependency
| author | Richard Burhans <burhans@bx.psu.edu> | 
|---|---|
| date | Fri, 20 Sep 2013 14:01:30 -0400 | 
| parents | 03c22b722882 | 
| children | f739a296a339 | 
| files | assignment_of_optimal_breeding_pairs.xml munkres.py tool_dependencies.xml | 
| diffstat | 3 files changed, 795 insertions(+), 0 deletions(-) [+] | 
line wrap: on
 line diff
--- a/assignment_of_optimal_breeding_pairs.xml Fri Sep 20 13:54:23 2013 -0400 +++ b/assignment_of_optimal_breeding_pairs.xml Fri Sep 20 14:01:30 2013 -0400 @@ -14,9 +14,11 @@ <data name="output" format="txt" /> </outputs> + <!-- <requirements> <requirement type="package" version="1.0.5.4">munkres</requirement> </requirements> + --> <!-- <tests>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/munkres.py Fri Sep 20 14:01:30 2013 -0400 @@ -0,0 +1,791 @@ +#!/usr/bin/env python +# -*- coding: iso-8859-1 -*- + +# Documentation is intended to be processed by Epydoc. + +""" +Introduction +============ + +The Munkres module provides an implementation of the Munkres algorithm +(also called the Hungarian algorithm or the Kuhn-Munkres algorithm), +useful for solving the Assignment Problem. + +Assignment Problem +================== + +Let *C* be an *n*\ x\ *n* matrix representing the costs of each of *n* workers +to perform any of *n* jobs. The assignment problem is to assign jobs to +workers in a way that minimizes the total cost. Since each worker can perform +only one job and each job can be assigned to only one worker the assignments +represent an independent set of the matrix *C*. + +One way to generate the optimal set is to create all permutations of +the indexes necessary to traverse the matrix so that no row and column +are used more than once. For instance, given this matrix (expressed in +Python):: + + matrix = [[5, 9, 1], + [10, 3, 2], + [8, 7, 4]] + +You could use this code to generate the traversal indexes:: + + def permute(a, results): + if len(a) == 1: + results.insert(len(results), a) + + else: + for i in range(0, len(a)): + element = a[i] + a_copy = [a[j] for j in range(0, len(a)) if j != i] + subresults = [] + permute(a_copy, subresults) + for subresult in subresults: + result = [element] + subresult + results.insert(len(results), result) + + results = [] + permute(range(len(matrix)), results) # [0, 1, 2] for a 3x3 matrix + +After the call to permute(), the results matrix would look like this:: + + [[0, 1, 2], + [0, 2, 1], + [1, 0, 2], + [1, 2, 0], + [2, 0, 1], + [2, 1, 0]] + +You could then use that index matrix to loop over the original cost matrix +and calculate the smallest cost of the combinations:: + + n = len(matrix) + minval = sys.maxint + for row in range(n): + cost = 0 + for col in range(n): + cost += matrix[row][col] + minval = min(cost, minval) + + print minval + +While this approach works fine for small matrices, it does not scale. It +executes in O(*n*!) time: Calculating the permutations for an *n*\ x\ *n* +matrix requires *n*! operations. For a 12x12 matrix, that's 479,001,600 +traversals. Even if you could manage to perform each traversal in just one +millisecond, it would still take more than 133 hours to perform the entire +traversal. A 20x20 matrix would take 2,432,902,008,176,640,000 operations. At +an optimistic millisecond per operation, that's more than 77 million years. + +The Munkres algorithm runs in O(*n*\ ^3) time, rather than O(*n*!). This +package provides an implementation of that algorithm. + +This version is based on +http://www.public.iastate.edu/~ddoty/HungarianAlgorithm.html. + +This version was written for Python by Brian Clapper from the (Ada) algorithm +at the above web site. (The ``Algorithm::Munkres`` Perl version, in CPAN, was +clearly adapted from the same web site.) + +Usage +===== + +Construct a Munkres object:: + + from munkres import Munkres + + m = Munkres() + +Then use it to compute the lowest cost assignment from a cost matrix. Here's +a sample program:: + + from munkres import Munkres, print_matrix + + matrix = [[5, 9, 1], + [10, 3, 2], + [8, 7, 4]] + m = Munkres() + indexes = m.compute(matrix) + print_matrix(matrix, msg='Lowest cost through this matrix:') + total = 0 + for row, column in indexes: + value = matrix[row][column] + total += value + print '(%d, %d) -> %d' % (row, column, value) + print 'total cost: %d' % total + +Running that program produces:: + + Lowest cost through this matrix: + [5, 9, 1] + [10, 3, 2] + [8, 7, 4] + (0, 0) -> 5 + (1, 1) -> 3 + (2, 2) -> 4 + total cost=12 + +The instantiated Munkres object can be used multiple times on different +matrices. + +Non-square Cost Matrices +======================== + +The Munkres algorithm assumes that the cost matrix is square. However, it's +possible to use a rectangular matrix if you first pad it with 0 values to make +it square. This module automatically pads rectangular cost matrices to make +them square. + +Notes: + +- The module operates on a *copy* of the caller's matrix, so any padding will + not be seen by the caller. +- The cost matrix must be rectangular or square. An irregular matrix will + *not* work. + +Calculating Profit, Rather than Cost +==================================== + +The cost matrix is just that: A cost matrix. The Munkres algorithm finds +the combination of elements (one from each row and column) that results in +the smallest cost. It's also possible to use the algorithm to maximize +profit. To do that, however, you have to convert your profit matrix to a +cost matrix. The simplest way to do that is to subtract all elements from a +large value. For example:: + + from munkres import Munkres, print_matrix + + matrix = [[5, 9, 1], + [10, 3, 2], + [8, 7, 4]] + cost_matrix = [] + for row in matrix: + cost_row = [] + for col in row: + cost_row += [sys.maxint - col] + cost_matrix += [cost_row] + + m = Munkres() + indexes = m.compute(cost_matrix) + print_matrix(matrix, msg='Highest profit through this matrix:') + total = 0 + for row, column in indexes: + value = matrix[row][column] + total += value + print '(%d, %d) -> %d' % (row, column, value) + + print 'total profit=%d' % total + +Running that program produces:: + + Highest profit through this matrix: + [5, 9, 1] + [10, 3, 2] + [8, 7, 4] + (0, 1) -> 9 + (1, 0) -> 10 + (2, 2) -> 4 + total profit=23 + +The ``munkres`` module provides a convenience method for creating a cost +matrix from a profit matrix. Since it doesn't know whether the matrix contains +floating point numbers, decimals, or integers, you have to provide the +conversion function; but the convenience method takes care of the actual +creation of the cost matrix:: + + import munkres + + cost_matrix = munkres.make_cost_matrix(matrix, + lambda cost: sys.maxint - cost) + +So, the above profit-calculation program can be recast as:: + + from munkres import Munkres, print_matrix, make_cost_matrix + + matrix = [[5, 9, 1], + [10, 3, 2], + [8, 7, 4]] + cost_matrix = make_cost_matrix(matrix, lambda cost: sys.maxint - cost) + m = Munkres() + indexes = m.compute(cost_matrix) + print_matrix(matrix, msg='Lowest cost through this matrix:') + total = 0 + for row, column in indexes: + value = matrix[row][column] + total += value + print '(%d, %d) -> %d' % (row, column, value) + print 'total profit=%d' % total + +References +========== + +1. http://www.public.iastate.edu/~ddoty/HungarianAlgorithm.html + +2. Harold W. Kuhn. The Hungarian Method for the assignment problem. + *Naval Research Logistics Quarterly*, 2:83-97, 1955. + +3. Harold W. Kuhn. Variants of the Hungarian method for assignment + problems. *Naval Research Logistics Quarterly*, 3: 253-258, 1956. + +4. Munkres, J. Algorithms for the Assignment and Transportation Problems. + *Journal of the Society of Industrial and Applied Mathematics*, + 5(1):32-38, March, 1957. + +5. http://en.wikipedia.org/wiki/Hungarian_algorithm + +Copyright and License +===================== + +This software is released under a BSD license, adapted from +<http://opensource.org/licenses/bsd-license.php> + +Copyright (c) 2008 Brian M. Clapper +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name "clapper.org" nor the names of its contributors may be + used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +""" + +__docformat__ = 'restructuredtext' + +# --------------------------------------------------------------------------- +# Imports +# --------------------------------------------------------------------------- + +import sys + +# --------------------------------------------------------------------------- +# Exports +# --------------------------------------------------------------------------- + +__all__ = ['Munkres', 'make_cost_matrix'] + +# --------------------------------------------------------------------------- +# Globals +# --------------------------------------------------------------------------- + +# Info about the module +__version__ = "1.0.5.4" +__author__ = "Brian Clapper, bmc@clapper.org" +__url__ = "http://bmc.github.com/munkres/" +__copyright__ = "(c) 2008 Brian M. Clapper" +__license__ = "BSD-style license" + +# --------------------------------------------------------------------------- +# Classes +# --------------------------------------------------------------------------- + +class Munkres: + """ + Calculate the Munkres solution to the classical assignment problem. + See the module documentation for usage. + """ + + def __init__(self): + """Create a new instance""" + self.C = None + self.row_covered = [] + self.col_covered = [] + self.n = 0 + self.Z0_r = 0 + self.Z0_c = 0 + self.marked = None + self.path = None + + def make_cost_matrix(profit_matrix, inversion_function): + """ + **DEPRECATED** + + Please use the module function ``make_cost_matrix()``. + """ + import munkres + return munkres.make_cost_matrix(profit_matrix, inversion_function) + + make_cost_matrix = staticmethod(make_cost_matrix) + + def pad_matrix(self, matrix, pad_value=0): + """ + Pad a possibly non-square matrix to make it square. + + :Parameters: + matrix : list of lists + matrix to pad + + pad_value : int + value to use to pad the matrix + + :rtype: list of lists + :return: a new, possibly padded, matrix + """ + max_columns = 0 + total_rows = len(matrix) + + for row in matrix: + max_columns = max(max_columns, len(row)) + + total_rows = max(max_columns, total_rows) + + new_matrix = [] + for row in matrix: + row_len = len(row) + new_row = row[:] + if total_rows > row_len: + # Row too short. Pad it. + new_row += [0] * (total_rows - row_len) + new_matrix += [new_row] + + while len(new_matrix) < total_rows: + new_matrix += [[0] * total_rows] + + return new_matrix + + def compute(self, cost_matrix): + """ + Compute the indexes for the lowest-cost pairings between rows and + columns in the database. Returns a list of (row, column) tuples + that can be used to traverse the matrix. + + :Parameters: + cost_matrix : list of lists + The cost matrix. If this cost matrix is not square, it + will be padded with zeros, via a call to ``pad_matrix()``. + (This method does *not* modify the caller's matrix. It + operates on a copy of the matrix.) + + **WARNING**: This code handles square and rectangular + matrices. It does *not* handle irregular matrices. + + :rtype: list + :return: A list of ``(row, column)`` tuples that describe the lowest + cost path through the matrix + + """ + self.C = self.pad_matrix(cost_matrix) + self.n = len(self.C) + self.original_length = len(cost_matrix) + self.original_width = len(cost_matrix[0]) + self.row_covered = [False for i in range(self.n)] + self.col_covered = [False for i in range(self.n)] + self.Z0_r = 0 + self.Z0_c = 0 + self.path = self.__make_matrix(self.n * 2, 0) + self.marked = self.__make_matrix(self.n, 0) + + done = False + step = 1 + + steps = { 1 : self.__step1, + 2 : self.__step2, + 3 : self.__step3, + 4 : self.__step4, + 5 : self.__step5, + 6 : self.__step6 } + + while not done: + try: + func = steps[step] + step = func() + except KeyError: + done = True + + # Look for the starred columns + results = [] + for i in range(self.original_length): + for j in range(self.original_width): + if self.marked[i][j] == 1: + results += [(i, j)] + + return results + + def __copy_matrix(self, matrix): + """Return an exact copy of the supplied matrix""" + return copy.deepcopy(matrix) + + def __make_matrix(self, n, val): + """Create an *n*x*n* matrix, populating it with the specific value.""" + matrix = [] + for i in range(n): + matrix += [[val for j in range(n)]] + return matrix + + def __step1(self): + """ + For each row of the matrix, find the smallest element and + subtract it from every element in its row. Go to Step 2. + """ + C = self.C + n = self.n + for i in range(n): + minval = min(self.C[i]) + # Find the minimum value for this row and subtract that minimum + # from every element in the row. + for j in range(n): + self.C[i][j] -= minval + + return 2 + + def __step2(self): + """ + Find a zero (Z) in the resulting matrix. If there is no starred + zero in its row or column, star Z. Repeat for each element in the + matrix. Go to Step 3. + """ + n = self.n + for i in range(n): + for j in range(n): + if (self.C[i][j] == 0) and \ + (not self.col_covered[j]) and \ + (not self.row_covered[i]): + self.marked[i][j] = 1 + self.col_covered[j] = True + self.row_covered[i] = True + + self.__clear_covers() + return 3 + + def __step3(self): + """ + Cover each column containing a starred zero. If K columns are + covered, the starred zeros describe a complete set of unique + assignments. In this case, Go to DONE, otherwise, Go to Step 4. + """ + n = self.n + count = 0 + for i in range(n): + for j in range(n): + if self.marked[i][j] == 1: + self.col_covered[j] = True + count += 1 + + if count >= n: + step = 7 # done + else: + step = 4 + + return step + + def __step4(self): + """ + Find a noncovered zero and prime it. If there is no starred zero + in the row containing this primed zero, Go to Step 5. Otherwise, + cover this row and uncover the column containing the starred + zero. Continue in this manner until there are no uncovered zeros + left. Save the smallest uncovered value and Go to Step 6. + """ + step = 0 + done = False + row = -1 + col = -1 + star_col = -1 + while not done: + (row, col) = self.__find_a_zero() + if row < 0: + done = True + step = 6 + else: + self.marked[row][col] = 2 + star_col = self.__find_star_in_row(row) + if star_col >= 0: + col = star_col + self.row_covered[row] = True + self.col_covered[col] = False + else: + done = True + self.Z0_r = row + self.Z0_c = col + step = 5 + + return step + + def __step5(self): + """ + Construct a series of alternating primed and starred zeros as + follows. Let Z0 represent the uncovered primed zero found in Step 4. + Let Z1 denote the starred zero in the column of Z0 (if any). + Let Z2 denote the primed zero in the row of Z1 (there will always + be one). Continue until the series terminates at a primed zero + that has no starred zero in its column. Unstar each starred zero + of the series, star each primed zero of the series, erase all + primes and uncover every line in the matrix. Return to Step 3 + """ + count = 0 + path = self.path + path[count][0] = self.Z0_r + path[count][1] = self.Z0_c + done = False + while not done: + row = self.__find_star_in_col(path[count][1]) + if row >= 0: + count += 1 + path[count][0] = row + path[count][1] = path[count-1][1] + else: + done = True + + if not done: + col = self.__find_prime_in_row(path[count][0]) + count += 1 + path[count][0] = path[count-1][0] + path[count][1] = col + + self.__convert_path(path, count) + self.__clear_covers() + self.__erase_primes() + return 3 + + def __step6(self): + """ + Add the value found in Step 4 to every element of each covered + row, and subtract it from every element of each uncovered column. + Return to Step 4 without altering any stars, primes, or covered + lines. + """ + minval = self.__find_smallest() + for i in range(self.n): + for j in range(self.n): + if self.row_covered[i]: + self.C[i][j] += minval + if not self.col_covered[j]: + self.C[i][j] -= minval + return 4 + + def __find_smallest(self): + """Find the smallest uncovered value in the matrix.""" + minval = sys.maxint + for i in range(self.n): + for j in range(self.n): + if (not self.row_covered[i]) and (not self.col_covered[j]): + if minval > self.C[i][j]: + minval = self.C[i][j] + return minval + + def __find_a_zero(self): + """Find the first uncovered element with value 0""" + row = -1 + col = -1 + i = 0 + n = self.n + done = False + + while not done: + j = 0 + while True: + if (self.C[i][j] == 0) and \ + (not self.row_covered[i]) and \ + (not self.col_covered[j]): + row = i + col = j + done = True + j += 1 + if j >= n: + break + i += 1 + if i >= n: + done = True + + return (row, col) + + def __find_star_in_row(self, row): + """ + Find the first starred element in the specified row. Returns + the column index, or -1 if no starred element was found. + """ + col = -1 + for j in range(self.n): + if self.marked[row][j] == 1: + col = j + break + + return col + + def __find_star_in_col(self, col): + """ + Find the first starred element in the specified row. Returns + the row index, or -1 if no starred element was found. + """ + row = -1 + for i in range(self.n): + if self.marked[i][col] == 1: + row = i + break + + return row + + def __find_prime_in_row(self, row): + """ + Find the first prime element in the specified row. Returns + the column index, or -1 if no starred element was found. + """ + col = -1 + for j in range(self.n): + if self.marked[row][j] == 2: + col = j + break + + return col + + def __convert_path(self, path, count): + for i in range(count+1): + if self.marked[path[i][0]][path[i][1]] == 1: + self.marked[path[i][0]][path[i][1]] = 0 + else: + self.marked[path[i][0]][path[i][1]] = 1 + + def __clear_covers(self): + """Clear all covered matrix cells""" + for i in range(self.n): + self.row_covered[i] = False + self.col_covered[i] = False + + def __erase_primes(self): + """Erase all prime markings""" + for i in range(self.n): + for j in range(self.n): + if self.marked[i][j] == 2: + self.marked[i][j] = 0 + +# --------------------------------------------------------------------------- +# Functions +# --------------------------------------------------------------------------- + +def make_cost_matrix(profit_matrix, inversion_function): + """ + Create a cost matrix from a profit matrix by calling + 'inversion_function' to invert each value. The inversion + function must take one numeric argument (of any type) and return + another numeric argument which is presumed to be the cost inverse + of the original profit. + + This is a static method. Call it like this: + + .. python:: + + cost_matrix = Munkres.make_cost_matrix(matrix, inversion_func) + + For example: + + .. python:: + + cost_matrix = Munkres.make_cost_matrix(matrix, lambda x : sys.maxint - x) + + :Parameters: + profit_matrix : list of lists + The matrix to convert from a profit to a cost matrix + + inversion_function : function + The function to use to invert each entry in the profit matrix + + :rtype: list of lists + :return: The converted matrix + """ + cost_matrix = [] + for row in profit_matrix: + cost_matrix.append([inversion_function(value) for value in row]) + return cost_matrix + +def print_matrix(matrix, msg=None): + """ + Convenience function: Displays the contents of a matrix of integers. + + :Parameters: + matrix : list of lists + Matrix to print + + msg : str + Optional message to print before displaying the matrix + """ + import math + + if msg is not None: + print msg + + # Calculate the appropriate format width. + width = 0 + for row in matrix: + for val in row: + width = max(width, int(math.log10(val)) + 1) + + # Make the format string + format = '%%%dd' % width + + # Print the matrix + for row in matrix: + sep = '[' + for val in row: + sys.stdout.write(sep + format % val) + sep = ', ' + sys.stdout.write(']\n') + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == '__main__': + + + matrices = [ + # Square + ([[400, 150, 400], + [400, 450, 600], + [300, 225, 300]], + 850 # expected cost + ), + + # Rectangular variant + ([[400, 150, 400, 1], + [400, 450, 600, 2], + [300, 225, 300, 3]], + 452 # expected cost + ), + + # Square + ([[10, 10, 8], + [ 9, 8, 1], + [ 9, 7, 4]], + 18 + ), + + # Rectangular variant + ([[10, 10, 8, 11], + [ 9, 8, 1, 1], + [ 9, 7, 4, 10]], + 15 + ), + ] + + m = Munkres() + for cost_matrix, expected_total in matrices: + print_matrix(cost_matrix, msg='cost matrix') + indexes = m.compute(cost_matrix) + total_cost = 0 + for r, c in indexes: + x = cost_matrix[r][c] + total_cost += x + print '(%d, %d) -> %d' % (r, c, x) + print 'lowest cost=%d' % total_cost + assert expected_total == total_cost +
--- a/tool_dependencies.xml Fri Sep 20 13:54:23 2013 -0400 +++ b/tool_dependencies.xml Fri Sep 20 14:01:30 2013 -0400 @@ -20,9 +20,11 @@ <package name="mechanize" version="0.2.5"> <repository prior_installation_required="True" toolshed="http://toolshed.g2.bx.psu.edu/" owner="miller-lab" name="package_mechanize_0_2_5" changeset_revision="59801857421b" /> </package> + <!-- <package name="munkres" version="1.0.5.4"> <repository prior_installation_required="True" toolshed="http://toolshed.g2.bx.psu.edu/" owner="miller-lab" name="package_munkres_1_0_5_4" changeset_revision="613b89b28767" /> </package> + --> <package name="networkx" version="1.8.1"> <repository prior_installation_required="True" toolshed="http://toolshed.g2.bx.psu.edu/" owner="miller-lab" name="package_networkx_1_8_1" changeset_revision="43c20433f2d6" /> </package>
