Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/test/Test_TableMapAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
2 # http://www.inra.fr | |
3 # http://urgi.versailles.inra.fr | |
4 # | |
5 # This software is governed by the CeCILL license under French law and | |
6 # abiding by the rules of distribution of free software. You can use, | |
7 # modify and/ or redistribute the software under the terms of the CeCILL | |
8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
9 # "http://www.cecill.info". | |
10 # | |
11 # As a counterpart to the access to the source code and rights to copy, | |
12 # modify and redistribute granted by the license, users are provided only | |
13 # with a limited warranty and the software's author, the holder of the | |
14 # economic rights, and the successive licensors have only limited | |
15 # liability. | |
16 # | |
17 # In this respect, the user's attention is drawn to the risks associated | |
18 # with loading, using, modifying and/or developing or reproducing the | |
19 # software by the user in light of its specific status of free software, | |
20 # that may mean that it is complicated to manipulate, and that also | |
21 # therefore means that it is reserved for developers and experienced | |
22 # professionals having in-depth computer knowledge. Users are therefore | |
23 # encouraged to load and test the software's suitability as regards their | |
24 # requirements in conditions enabling the security of their systems and/or | |
25 # data to be ensured and, more generally, to use and operate it in the | |
26 # same conditions as regards security. | |
27 # | |
28 # The fact that you are presently reading this means that you have had | |
29 # knowledge of the CeCILL license and that you accept its terms. | |
30 | |
31 | |
32 import unittest | |
33 import time | |
34 import os | |
35 from commons.core.sql.TableMapAdaptator import TableMapAdaptator | |
36 from commons.core.sql.DbMySql import DbMySql | |
37 from commons.core.coord.Map import Map | |
38 from commons.core.coord.Set import Set | |
39 | |
40 | |
41 class Test_TableMapAdaptator( unittest.TestCase ): | |
42 | |
43 def setUp( self ): | |
44 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() ) | |
45 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId ) | |
46 configF = open(self._configFileName, "w" ) | |
47 configF.write( "[repet_env]\n" ) | |
48 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) | |
49 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) | |
50 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) | |
51 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) | |
52 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) | |
53 configF.close() | |
54 self._iDb = DbMySql( cfgFileName=self._configFileName ) | |
55 self._table = "dummyMapTable_%s" % ( self._uniqId ) | |
56 self._tMapA = TableMapAdaptator( self._iDb, self._table ) | |
57 | |
58 | |
59 def tearDown( self ): | |
60 self._uniqId = None | |
61 self._iDb.dropTable( self._table ) | |
62 self._iDb.close() | |
63 self._table = None | |
64 self._tMapA = None | |
65 os.remove( self._configFileName ) | |
66 self._configFileName = "" | |
67 | |
68 ################################################################################## | |
69 ################## Tests for methods in ITableMapAdaptator ####################### | |
70 ################################################################################## | |
71 | |
72 def test_getEndFromSeqName(self): | |
73 self._iDb.createTable( self._table, "map", "" ) | |
74 map1 = Map() | |
75 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
76 map2 = Map() | |
77 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
78 for m in [ map1, map2]: | |
79 self._tMapA.insert(m) | |
80 expEnd = 20 | |
81 obsEnd = self._tMapA.getEndFromSeqName("desc2") | |
82 self.assertEqual(expEnd, obsEnd) | |
83 | |
84 | |
85 def test_getMapListFromSeqName( self ): | |
86 self._iDb.createTable( self._table, "map", "" ) | |
87 map1 = Map() | |
88 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
89 map2 = Map() | |
90 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
91 map3 = Map() | |
92 map3.setFromString( "name2\tdesc2\t1\t50\n" ) | |
93 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
94 lExp = [ map2, map3 ] | |
95 lObs = self._tMapA.getMapListFromSeqName("name2") | |
96 self.assertEqual( lObs, lExp ) | |
97 | |
98 def test_getMapListFromChr( self ): | |
99 self._iDb.createTable( self._table, "map", "" ) | |
100 map1 = Map() | |
101 map1.setFromString( "name1\tchr1\t1\t120\n" ) | |
102 map2 = Map() | |
103 map2.setFromString( "name2\tchr2\t1\t20\n" ) | |
104 map3 = Map() | |
105 map3.setFromString( "name2\tchr2\t1\t50\n" ) | |
106 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
107 lExp = [ map2, map3 ] | |
108 lObs = self._tMapA.getMapListFromChr("chr2") | |
109 self.assertEqual( lObs, lExp ) | |
110 | |
111 def test_getSeqNameList(self): | |
112 self._iDb.createTable( self._table, "map", "" ) | |
113 map1 = Map() | |
114 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
115 map2 = Map() | |
116 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
117 map3 = Map() | |
118 map3.setFromString( "name2\tdesc2\t1\t50\n" ) | |
119 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
120 lExp = ["desc1", "desc2"] | |
121 lObs = self._tMapA.getSeqNameList() | |
122 self.assertEqual( lObs, lExp ) | |
123 | |
124 def test_insert_one_element( self ): | |
125 map2Insert = Map() | |
126 map2Insert.name="name1" | |
127 map2Insert.seqname="name2" | |
128 map2Insert.start=1L | |
129 map2Insert.end=50L | |
130 self._iDb.createTable( self._table, "map", "" ) | |
131 self._tMapA.insert( map2Insert ) | |
132 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
133 self._iDb.execute( sqlCmd ) | |
134 expTmapTuple = (('name1', 'name2', 1L, 50L),) | |
135 obsTmapTuples = self._iDb.cursor.fetchall() | |
136 self.assertEquals( expTmapTuple, obsTmapTuples ) | |
137 | |
138 def test_insert_two_elements( self ): | |
139 map1 = Map() | |
140 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
141 map2 = Map() | |
142 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
143 self._iDb.createTable( self._table, "map", "" ) | |
144 for m in [ map1, map2 ]: self._tMapA.insert( m ) | |
145 expTmapTuple = ( ('name1', 'desc1', 1L, 120L), ('name2', 'desc2', 1L, 20L) ) | |
146 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
147 self._iDb.execute( sqlCmd ) | |
148 obsTmapTuples = self._iDb.cursor.fetchall() | |
149 self.assertEquals(expTmapTuple, obsTmapTuples ) | |
150 | |
151 def test_insertList( self ): | |
152 self._iDb.createTable( self._table, "map", "" ) | |
153 map1 = Map() | |
154 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
155 map2 = Map() | |
156 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
157 lmap = [ map1, map2 ] | |
158 self._tMapA.insertList( lmap ) | |
159 lExp = lmap | |
160 lObs = self._tMapA.getListOfAllMaps() | |
161 self.assertEqual( lObs, lExp ) | |
162 | |
163 def test_getSetListFromSeqName( self ): | |
164 self._iDb.createTable( self._table, "map", "" ) | |
165 map1 = Map() | |
166 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
167 map2 = Map() | |
168 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
169 map3 = Map() | |
170 map3.setFromString( "name2\tdesc2\t1\t50\n" ) | |
171 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
172 explMap = [Set( 1,"name2", "desc2", 1, 20), Set( 2,"name2", "desc2", 1, 50)] | |
173 obslMap = self._tMapA.getSetListFromSeqName("name2") | |
174 self.assertEqual( explMap, obslMap ) | |
175 | |
176 def test_getMapListOverlappingCoord( self ): | |
177 self._iDb.createTable( self._table, "map", "" ) | |
178 map1 = Map() | |
179 map1.setFromString( "name1\tdesc1\t70\t120\n" ) | |
180 map2 = Map() | |
181 map2.setFromString( "name2\tdesc1\t1\t20\n" ) | |
182 map3 = Map() | |
183 map3.setFromString( "name3\tdesc1\t1\t50\n" ) | |
184 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
185 explMap = [Map("name2", "desc1", 1, 20), Map("name3", "desc1", 1, 50)] | |
186 obslMap = self._tMapA.getMapListOverlappingCoord("desc1", 1, 60) | |
187 self.assertEqual( explMap, obslMap ) | |
188 | |
189 def test_getSetListOverlappingCoord( self ): | |
190 self._iDb.createTable( self._table, "map", "" ) | |
191 map1 = Map() | |
192 map1.setFromString( "name1\tdesc1\t70\t120\n" ) | |
193 map2 = Map() | |
194 map2.setFromString( "name2\tdesc1\t1\t20\n" ) | |
195 map3 = Map() | |
196 map3.setFromString( "name3\tdesc1\t1\t50\n" ) | |
197 for m in [ map1, map2, map3 ]: self._tMapA.insert( m ) | |
198 explSet = [Set(1, "name2", "desc1", 1, 20), Set(2, "name3", "desc1", 1, 50)] | |
199 obslSet = self._tMapA.getSetListOverlappingCoord("desc1", 1, 60) | |
200 self.assertEqual( explSet, obslSet ) | |
201 | |
202 ################################################################################## | |
203 ########################### Tests for other methods ############################## | |
204 ################################################################################## | |
205 | |
206 def test_getListOfAllMaps( self ): | |
207 self._iDb.createTable( self._table, "map", "" ) | |
208 map1 = Map() | |
209 map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
210 map2 = Map() | |
211 map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
212 for m in [ map1, map2 ]: self._tMapA.insert( m ) | |
213 lExp = [ map1, map2 ] | |
214 lObs = self._tMapA.getListOfAllMaps() | |
215 self.assertEqual( lObs, lExp ) | |
216 | |
217 def test_getDictPerNameFromMapFile( self ): | |
218 self._iDb.createTable( self._table, "map", "" ) | |
219 iMap1 = Map( "chunk1", "chromosome1", 1, 100 ) | |
220 iMap2 = Map( "chunk2", "chromosome1", 91, 190 ) | |
221 iMap3 = Map( "chunk3", "chromosome2", 1, 100 ) | |
222 iMap4 = Map( "chunk1", "chromosome1", 1, 100 ) # redundant with iMap1 | |
223 for iMap in [ iMap1, iMap2, iMap3, iMap4 ]: | |
224 self._tMapA.insert( iMap ) | |
225 dExp = { "chunk1": iMap1, "chunk2": iMap2, "chunk3": iMap3 } | |
226 dObs = self._tMapA.getDictPerName() | |
227 self.assertEquals( dExp, dObs ) | |
228 | |
229 #TODO: Check getListFromSeqName method: uses name instead of seqname | |
230 # def test_getMapListFromSeqNameList( self ): | |
231 # self._iDb.createTable( self._table, "map", "" ) | |
232 # map1 = Map() | |
233 # map1.setFromString( "name1\tdesc1\t1\t120\n" ) | |
234 # map2 = Map() | |
235 # map2.setFromString( "name2\tdesc2\t1\t20\n" ) | |
236 # map3 = Map() | |
237 # map3.setFromString( "name3\tdesc2\t1\t10\n" ) | |
238 # map4 = Map() | |
239 # map4.setFromString( "name4\tdesc3\t10\t200\n" ) | |
240 # for m in [map1, map2, map3, map4]: self._tMapA.insert( m ) | |
241 # | |
242 # lMapToRetrieve = ["name1", "desc2"] | |
243 # lExp = [map1, map2, map3] | |
244 # lObs = self._tMapA.getMapListFromSeqNameList(lMapToRetrieve) | |
245 # self.assertEqual( lObs, lExp ) | |
246 | |
247 test_suite = unittest.TestSuite() | |
248 test_suite.addTest( unittest.makeSuite( Test_TableMapAdaptator ) ) | |
249 if __name__ == "__main__": | |
250 unittest.TextTestRunner(verbosity=2).run( test_suite ) |