Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/test/Test_TableBinSetAdaptator.py @ 14:c79b9ae3f65f
Deleted selected files
author | m-zytnicki |
---|---|
date | Fri, 19 Apr 2013 10:13:11 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
import unittest import os import time from commons.core.sql.TableBinSetAdaptator import TableBinSetAdaptator from commons.core.coord.Set import Set from commons.core.sql.DbFactory import DbFactory class Test_TableBinSetAdaptator(unittest.TestCase): def setUp(self): self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S") , os.getpid()) self._iDb = DbFactory.createInstance() radicalTableName = "dummySetTable" self._tableName = "%s_%s" % (radicalTableName, self._uniqId) self._tableName_bin = "%s_idx" % self._tableName self._setFileName = "dummySetFile_%s" % self._uniqId setF = open( self._setFileName, "w" ) setF.write("1\tseq1\tchr1\t1900\t3900\n") setF.write("2\tseq2\tchr1\t2\t9\n") setF.write("3\tseq3\tchr1\t8\t13\n") setF.close() self._iDb.createTable(self._tableName, "set", self._setFileName) self._iTableBinSetAdaptator = TableBinSetAdaptator(self._iDb, self._tableName) def tearDown(self): self._iDb.dropTable( self._tableName ) self._iDb.dropTable( self._tableName_bin ) self._iDb.close() if os.path.exists(self._setFileName): os.remove(self._setFileName) def test_insASetInSetAndBinTable(self): iSet = Set(1, "set1", "seq1", 2, 1) self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_insASetInSetAndBinTable_delayedCase(self): iSet = Set(1, "set1", "seq1", 2, 1) self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insASetInSetAndBinTable(iSet, True) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (1L, 1000.0, 'seq1', 1L, 2L, 0L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (1L, 'set1', 'seq1', 2L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_deleteFromIdFromSetAndBinTable(self): self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.deleteFromIdFromSetAndBinTable(2) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (3L, 'seq3', 'chr1', 8L, 13L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_deleteFromListIdFromSetAndBinTable(self): lSetToRemove = [1,2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.deleteFromListIdFromSetAndBinTable(lSetToRemove) expTupleInBinTable = ((3L, 1000.0, 'chr1', 8L, 13L, 1L),) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((3L, 'seq3', 'chr1', 8L, 13L),) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) os.remove(self._setFileName) def test_joinTwoSetsFromSetAndBinTable(self): id1 = 1 id2 = 2 self._iDb.createBinSetTable(self._tableName, True) obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) expNewId = 1 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) self.assertEquals(expNewId, obsNewId) def test_joinTwoSetsFromSetAndBinTable_with_reversed_id(self): id1 = 2 id2 = 1 self._iDb.createBinSetTable(self._tableName, True) obsNewId = self._iTableBinSetAdaptator.joinTwoSetsFromSetAndBinTable(id1, id2) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (1L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) expNewId = 1 sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (1L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) self.assertEquals(expNewId, obsNewId) def test_getNewId(self): self._iDb.createBinSetTable(self._tableName, True) obsNewId = self._iTableBinSetAdaptator.getNewId() expNewId = 4 self.assertEquals(expNewId, obsNewId) def test_getNewId_empty_table(self): self._iDb.dropTable( self._tableName ) self._iDb.dropTable( self._tableName_bin ) setF = open( self._setFileName, "w" ) setF.close() self._iDb.createTable( self._tableName, "set", self._setFileName ) self._iDb.createBinSetTable(self._tableName, True) obsNewId = self._iTableBinSetAdaptator.getNewId() expNewId = 1 self.assertEquals(expNewId, obsNewId) def test_getSetListFromQueryCoord(self): start = 10 end = 4000 seqName = 'chr1' self._iDb.createBinSetTable(self._tableName, True) obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) iSet1 = Set(1, "seq1", "chr1", 1900, 3900) iSet2 = Set(3, "seq3", "chr1", 8, 13) expLSet = [iSet1, iSet2] self.assertEquals(expLSet, obsLSet) def test_getSetListFromQueryCoord_return_empty_list(self): start = 4000 end = 40000 seqName = 'chr1' self._iDb.createBinSetTable(self._tableName, True) obsLSet = self._iTableBinSetAdaptator.getSetListFromQueryCoord(seqName, start, end) expLSet = [] self.assertEquals(expLSet, obsLSet) def test_getSetListStrictlyIncludedInQueryCoord(self): start = 10 end = 4000 seqName = 'chr1' self._iDb.createBinSetTable(self._tableName, True) obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) iSet1 = Set(1, "seq1", "chr1", 1900, 3900) expLSet = [iSet1] self.assertEquals(expLSet, obsLSet) def test_getSetListStrictlyIncludedInQueryCoord_return_empty_list(self): start = 4000 end = 40000 seqName = 'chr1' self._iDb.createBinSetTable(self._tableName, True) obsLSet = self._iTableBinSetAdaptator.getSetListStrictlyIncludedInQueryCoord(seqName, start, end) expLSet = [] self.assertEquals(expLSet, obsLSet) def test_getIdList(self): expLId = [1,2,3] self._iDb.createBinSetTable(self._tableName, True) obsLId = self._iTableBinSetAdaptator.getIdList() self.assertEquals(expLId, obsLId) def test_getSeqNameList(self): self._iDb.dropTable( self._tableName ) self._iDb.dropTable( self._tableName_bin ) setF = open( self._setFileName, "w" ) setF.write("1\tseq1\tchr2\t1900\t3900\n") setF.write("2\tseq2\tchr1\t2\t9\n") setF.write("3\tseq3\tchr1\t8\t13\n") setF.close() self._iDb.createTable( self._tableName, "set", self._setFileName ) self._iDb.createBinSetTable(self._tableName, True) expLSeqName = ["chr1", "chr2"] obsLSeqName = self._iTableBinSetAdaptator.getSeqNameList() self.assertEquals(expLSeqName, obsLSeqName) def test_insertListInSetAndBinTable(self): iSet1 = Set(1, "seq4", "chr1", 100, 390) iSet2 = Set(2, "seq5", "chr1", 1, 13) lSet = [iSet1, iSet2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insertListInSetAndBinTable(lSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (4L, 1000.0, 'chr1', 1L, 13L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (4L, 'seq5', 'chr1', 1L, 13L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_insertListInSetAndBinTableAndMergeAllSets(self): iSet1 = Set(1, "seq4", "chr1", 100, 390) iSet2 = Set(2, "seq5", "chr1", 1, 13) lSet = [iSet1, iSet2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insertListInSetAndBinTableAndMergeAllSets(lSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (5L, 1000.0, 'chr1', 1L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (5L, 'seq5', 'chr1', 1L, 13L), (4L, 'seq4', 'chr1', 100L, 390L) ) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_insertListInSetAndBinTableAndRemoveOverlaps(self): iSet1 = Set(1, "seq4", "chr1", 100, 390) iSet2 = Set(2, "seq5", "chr1", 1, 13) lSet = [iSet1, iSet2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_insertListInSetAndBinTableAndRemoveOverlaps_Without_Overlaps(self): iSet1 = Set(1, "seq4", "chr1", 100, 390) iSet2 = Set(2, "seq5", "chr1", 50, 65) lSet = [iSet1, iSet2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L), (4L, 1000.0, 'chr1', 100L, 390L, 1L), (5L, 1000.0, 'chr1', 50L, 65L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L), (4L, 'seq4', 'chr1', 100L, 390L), (5L, 'seq5', 'chr1', 50L, 65L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) def test_insertListInSetAndBinTableAndRemoveOverlaps_With_Only_Overlaps(self): iSet1 = Set(1, "seq4", "chr1", 1, 5) iSet2 = Set(2, "seq5", "chr1", 8, 13) lSet = [iSet1, iSet2] self._iDb.createBinSetTable(self._tableName, True) self._iTableBinSetAdaptator.insertListInSetAndBinTableAndRemoveOverlaps(lSet) expTupleInBinTable = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName_bin ) self._iDb.execute( sqlCmd ) obsTupleInBinTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInBinTable, obsTupleInBinTable) expTupleInSetTable = ((1L, 'seq1', 'chr1', 1900L, 3900L), (2L, 'seq2', 'chr1', 2L, 9L), (3L, 'seq3', 'chr1', 8L, 13L)) sqlCmd = "SELECT * FROM %s" % ( self._tableName ) self._iDb.execute( sqlCmd ) obsTupleInSetTable = self._iDb.cursor.fetchall() self.assertEquals(expTupleInSetTable, obsTupleInSetTable) if __name__ == "__main__": unittest.main()