Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/test/Test_TableSetAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/test/Test_TableSetAdaptator.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,330 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import unittest +import time +import os +from commons.core.sql.TableSetAdaptator import TableSetAdaptator +from commons.core.sql.DbMySql import DbMySql +from commons.core.coord.Set import Set + + +class Test_TableSetAdaptator( unittest.TestCase ): + + def setUp( self ): + self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() ) + self._configFileName = "dummyConfigFile_%s" % ( self._uniqId ) + configF = open(self._configFileName, "w" ) + configF.write( "[repet_env]\n" ) + configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) + configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) + configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) + configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) + configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) + configF.close() + self._iDb = DbMySql( cfgFileName=self._configFileName ) + self._table = "dummySetTable_%s" % ( self._uniqId ) + self._tSetA = TableSetAdaptator( self._iDb, self._table ) + + def tearDown( self ): + self._uniqId = None + self._iDb.dropTable( self._table ) + self._iDb.close() + self._table = None + self._tSetA = None + os.remove( self._configFileName ) + self._configFileName = "" + + def test_insert(self): + set2Insert = Set() + set2Insert.id = 1 + set2Insert.name = "name1" + set2Insert.seqname = "name2" + set2Insert.start = 1L + set2Insert.end = 50L + self._iDb.createTable( self._table, "set", "" ) + self._tSetA.insert( set2Insert, False ) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + expTsetTuple = ((1, "name1", "name2", 1L, 50L),) + obsTsetTuples = self._iDb.cursor.fetchall() + self.assertEquals(expTsetTuple, obsTsetTuples ) + + def test_insertList ( self ): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + expTsetTuple = ((1, "name1", "desc1", 1, 120), (2, "name2", "desc2", 1, 20)) + obsTsetTuples = self._iDb.cursor.fetchall() + self.assertEqual( expTsetTuple, obsTsetTuples ) + + def test_getIdList(self): + set2Insert = Set() + set2Insert.id = 1 + set2Insert.name = "name1" + set2Insert.seqname = "name2" + set2Insert.start = 1 + set2Insert.end = 50 + self._iDb.createTable( self._table, "set", "" ) + self._tSetA.insert( set2Insert ) + l = self._tSetA.getIdList() + self.assertEquals( set2Insert.id, l[0] ) + + def test_getSeqNameList(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "2\tname2\tdesc2\t1\t50\n" ) + for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) + lExp = ["desc1", "desc2"] + lObs = self._tSetA.getSeqNameList() + self.assertEqual( lObs, lExp ) + + def test_getSetListFromSeqName(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "3\tname2\tdesc2\t1\t50\n" ) + for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) + explSet = [Set( 2,"name2", "desc2", 1, 20), Set( 3,"name2", "desc2", 1, 50)] + obslSet = self._tSetA.getSetListFromSeqName("desc2") + self.assertEqual( explSet, obslSet ) + + def test_getSetListFromId(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + explSet = [Set( 2,"name2", "desc2", 1, 20)] + obslSet = self._tSetA.getSetListFromId(2) + self.assertEqual( explSet, obslSet ) + + def test_getSetListFromIdList(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + explSet = [Set( 1, "name1", "desc1", 1, 120), Set( 2,"name2", "desc2", 1, 20)] + lId = [1, 2] + obslSet = self._tSetA.getSetListFromIdList(lId) + self.assertEqual( explSet, obslSet ) + + def test_getSetListFromIdList_emptyList(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + explSet = [] + lId = [] + obslSet = self._tSetA.getSetListFromIdList(lId) + self.assertEqual( explSet, obslSet ) + + def test_getSetListOverlappingCoord(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc2\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + explSet = [Set( 1,"name1", "desc2", 1, 120)] + obslSet = self._tSetA.getSetListOverlappingCoord("desc2", 30, 70) + self.assertEqual( explSet, obslSet ) + + def test_deleteFromId(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) + for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) + self._tSetA.deleteFromId(1) + expTSetTuples = (( 2,"name2", "desc2", 1, 20), ( 3,"name2", "desc3", 1, 50)) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + obsTsetTuples = self._iDb.cursor.fetchall() + + self.assertEqual( expTSetTuples, obsTsetTuples ) + + def test_deleteFromIdList(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) + for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) + lId2del = [1, 2] + self._tSetA.deleteFromIdList(lId2del) + expTSetTuples = (( 3,"name2", "desc3", 1, 50),) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + obsTsetTuples = self._iDb.cursor.fetchall() + + self.assertEqual( expTSetTuples, obsTsetTuples ) + + def test_deleteFromIdListWithEmptyList(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) + for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) + lId2del = [] + self._tSetA.deleteFromIdList(lId2del) + expTSetTuples = ((1L, 'name1', 'desc1', 1L, 120L), (2L, 'name2', 'desc2', 1L, 20L), (3L, 'name2', 'desc3', 1L, 50L)) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + obsTsetTuples = self._iDb.cursor.fetchall() + + self.assertEqual( expTSetTuples, obsTsetTuples ) + + def test_joinTwoSets(self): + self._iDb.createTable( self._table, "set", "" ) + idSet1 = 5 + set1 = Set() + set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) + idSet2 = 2 + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + self._tSetA.joinTwoSets(idSet1, idSet2) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + + expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L )) + obsTSetTuples = self._iDb.cursor.fetchall() + + self.assertEqual( expTSetTuples, obsTSetTuples) + self._iDb.dropTable(self._table) + + def test_joinTwoSetsWhereId1InfId2(self): + self._iDb.createTable( self._table, "set", "" ) + idSet1 = 2 + set1 = Set() + set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) + + idSet2 = 5 + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + + lset = [ set1, set2 ] + self._tSetA.insertList( lset ) + + self._tSetA.joinTwoSets(idSet1, idSet2) + + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + + expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L )) + obsTSetTuples = self._iDb.cursor.fetchall() + + self.assertEqual( expTSetTuples, obsTSetTuples) + self._iDb.dropTable(self._table) + + def test_getNewId(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) + set2 = Set() + set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) + set3 = Set() + set3.setFromString( "5\tname1\tdesc1\t1\t120\n" ) + set4 = Set() + set4.setFromString( "8\tname2\tdesc2\t1\t20\n" ) + lset = [ set1, set2, set3, set4 ] + self._tSetA.insertList( lset ) + expId = 9 + obsId = self._tSetA.getNewId() + self.assertEqual( expId, obsId) + self._iDb.dropTable(self._table) + + def test_getNewId_set_null(self): + self._iDb.createTable( self._table, "set", "" ) + set1 = Set() + lset = [ set1 ] + self._tSetA.insertList( lset ) + expId = 1 + obsId = self._tSetA.getNewId() + self.assertEqual( expId, obsId) + self._iDb.dropTable(self._table) + + def test_getListOfAllSets( self ): + self._iDb.createTable( self._table, "set" ) + s1 = Set() + s1.setFromString( "1\tchr1\tTE3\t1\t10\n" ) + s2a = Set() + s2a.setFromString( "2\tchr1\tTE2\t2\t9\n" ) + s2b = Set() + s2b.setFromString( "2\tchr1\tTE2\t12\t19\n" ) + lSets = [ s1, s2a, s2b ] + self._tSetA.insertList( lSets ) + expLSets = [ s1, s2a, s2b ] + obsLSets = self._tSetA.getListOfAllSets() + self.assertEqual( expLSets, obsLSets ) + + def test_getListOfAllSets_empty_table( self ): + self._iDb.createTable( self._table, "set" ) + expList = [] + obsList = self._tSetA.getListOfAllSets() + self.assertEqual( expList, obsList ) + +test_suite = unittest.TestSuite() +test_suite.addTest( unittest.makeSuite( Test_TableSetAdaptator ) ) +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run( test_suite )