Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/test/Test_TableMapAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/test/Test_TableMapAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,250 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import unittest +import time +import os +from commons.core.sql.TableMapAdaptator import TableMapAdaptator +from commons.core.sql.DbMySql import DbMySql +from commons.core.coord.Map import Map +from commons.core.coord.Set import Set + + +class Test_TableMapAdaptator( unittest.TestCase ): + + def setUp( self ): + self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() ) + self._configFileName = "dummyConfigFile_%s" % ( self._uniqId ) + configF = open(self._configFileName, "w" ) + configF.write( "[repet_env]\n" ) + configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) + configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) + configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) + configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) + configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) + configF.close() + self._iDb = DbMySql( cfgFileName=self._configFileName ) + self._table = "dummyMapTable_%s" % ( self._uniqId ) + self._tMapA = TableMapAdaptator( self._iDb, self._table ) + + + def tearDown( self ): + self._uniqId = None + self._iDb.dropTable( self._table ) + self._iDb.close() + self._table = None + self._tMapA = None + os.remove( self._configFileName ) + self._configFileName = "" + +################################################################################## +################## Tests for methods in ITableMapAdaptator ####################### +################################################################################## + + def test_getEndFromSeqName(self): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + for m in [ map1, map2]: + self._tMapA.insert(m) + expEnd = 20 + obsEnd = self._tMapA.getEndFromSeqName("desc2") + self.assertEqual(expEnd, obsEnd) + + + def test_getMapListFromSeqName( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name2\tdesc2\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + lExp = [ map2, map3 ] + lObs = self._tMapA.getMapListFromSeqName("name2") + self.assertEqual( lObs, lExp ) + + def test_getMapListFromChr( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tchr1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tchr2\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name2\tchr2\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + lExp = [ map2, map3 ] + lObs = self._tMapA.getMapListFromChr("chr2") + self.assertEqual( lObs, lExp ) + + def test_getSeqNameList(self): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name2\tdesc2\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + lExp = ["desc1", "desc2"] + lObs = self._tMapA.getSeqNameList() + self.assertEqual( lObs, lExp ) + + def test_insert_one_element( self ): + map2Insert = Map() + map2Insert.name="name1" + map2Insert.seqname="name2" + map2Insert.start=1L + map2Insert.end=50L + self._iDb.createTable( self._table, "map", "" ) + self._tMapA.insert( map2Insert ) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + expTmapTuple = (('name1', 'name2', 1L, 50L),) + obsTmapTuples = self._iDb.cursor.fetchall() + self.assertEquals( expTmapTuple, obsTmapTuples ) + + def test_insert_two_elements( self ): + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + self._iDb.createTable( self._table, "map", "" ) + for m in [ map1, map2 ]: self._tMapA.insert( m ) + expTmapTuple = ( ('name1', 'desc1', 1L, 120L), ('name2', 'desc2', 1L, 20L) ) + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._iDb.execute( sqlCmd ) + obsTmapTuples = self._iDb.cursor.fetchall() + self.assertEquals(expTmapTuple, obsTmapTuples ) + + def test_insertList( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + lmap = [ map1, map2 ] + self._tMapA.insertList( lmap ) + lExp = lmap + lObs = self._tMapA.getListOfAllMaps() + self.assertEqual( lObs, lExp ) + + def test_getSetListFromSeqName( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name2\tdesc2\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + explMap = [Set( 1,"name2", "desc2", 1, 20), Set( 2,"name2", "desc2", 1, 50)] + obslMap = self._tMapA.getSetListFromSeqName("name2") + self.assertEqual( explMap, obslMap ) + + def test_getMapListOverlappingCoord( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t70\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc1\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name3\tdesc1\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + explMap = [Map("name2", "desc1", 1, 20), Map("name3", "desc1", 1, 50)] + obslMap = self._tMapA.getMapListOverlappingCoord("desc1", 1, 60) + self.assertEqual( explMap, obslMap ) + + def test_getSetListOverlappingCoord( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t70\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc1\t1\t20\n" ) + map3 = Map() + map3.setFromString( "name3\tdesc1\t1\t50\n" ) + for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) + explSet = [Set(1, "name2", "desc1", 1, 20), Set(2, "name3", "desc1", 1, 50)] + obslSet = self._tMapA.getSetListOverlappingCoord("desc1", 1, 60) + self.assertEqual( explSet, obslSet ) + +################################################################################## +########################### Tests for other methods ############################## +################################################################################## + + def test_getListOfAllMaps( self ): + self._iDb.createTable( self._table, "map", "" ) + map1 = Map() + map1.setFromString( "name1\tdesc1\t1\t120\n" ) + map2 = Map() + map2.setFromString( "name2\tdesc2\t1\t20\n" ) + for m in [ map1, map2 ]: self._tMapA.insert( m ) + lExp = [ map1, map2 ] + lObs = self._tMapA.getListOfAllMaps() + self.assertEqual( lObs, lExp ) + + def test_getDictPerNameFromMapFile( self ): + self._iDb.createTable( self._table, "map", "" ) + iMap1 = Map( "chunk1", "chromosome1", 1, 100 ) + iMap2 = Map( "chunk2", "chromosome1", 91, 190 ) + iMap3 = Map( "chunk3", "chromosome2", 1, 100 ) + iMap4 = Map( "chunk1", "chromosome1", 1, 100 ) # redundant with iMap1 + for iMap in [ iMap1, iMap2, iMap3, iMap4 ]: + self._tMapA.insert( iMap ) + dExp = { "chunk1": iMap1, "chunk2": iMap2, "chunk3": iMap3 } + dObs = self._tMapA.getDictPerName() + self.assertEquals( dExp, dObs ) + +#TODO: Check getListFromSeqName method: uses name instead of seqname +# def test_getMapListFromSeqNameList( self ): +# self._iDb.createTable( self._table, "map", "" ) +# map1 = Map() +# map1.setFromString( "name1\tdesc1\t1\t120\n" ) +# map2 = Map() +# map2.setFromString( "name2\tdesc2\t1\t20\n" ) +# map3 = Map() +# map3.setFromString( "name3\tdesc2\t1\t10\n" ) +# map4 = Map() +# map4.setFromString( "name4\tdesc3\t10\t200\n" ) +# for m in [map1, map2, map3, map4]: self._tMapA.insert( m ) +# +# lMapToRetrieve = ["name1", "desc2"] +# lExp = [map1, map2, map3] +# lObs = self._tMapA.getMapListFromSeqNameList(lMapToRetrieve) +# self.assertEqual( lObs, lExp ) + +test_suite = unittest.TestSuite() +test_suite.addTest( unittest.makeSuite( Test_TableMapAdaptator ) ) +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run( test_suite )