Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/sql/test/Test_TableSetAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
2 # http://www.inra.fr | |
3 # http://urgi.versailles.inra.fr | |
4 # | |
5 # This software is governed by the CeCILL license under French law and | |
6 # abiding by the rules of distribution of free software. You can use, | |
7 # modify and/ or redistribute the software under the terms of the CeCILL | |
8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
9 # "http://www.cecill.info". | |
10 # | |
11 # As a counterpart to the access to the source code and rights to copy, | |
12 # modify and redistribute granted by the license, users are provided only | |
13 # with a limited warranty and the software's author, the holder of the | |
14 # economic rights, and the successive licensors have only limited | |
15 # liability. | |
16 # | |
17 # In this respect, the user's attention is drawn to the risks associated | |
18 # with loading, using, modifying and/or developing or reproducing the | |
19 # software by the user in light of its specific status of free software, | |
20 # that may mean that it is complicated to manipulate, and that also | |
21 # therefore means that it is reserved for developers and experienced | |
22 # professionals having in-depth computer knowledge. Users are therefore | |
23 # encouraged to load and test the software's suitability as regards their | |
24 # requirements in conditions enabling the security of their systems and/or | |
25 # data to be ensured and, more generally, to use and operate it in the | |
26 # same conditions as regards security. | |
27 # | |
28 # The fact that you are presently reading this means that you have had | |
29 # knowledge of the CeCILL license and that you accept its terms. | |
30 | |
31 | |
32 import unittest | |
33 import time | |
34 import os | |
35 from commons.core.sql.TableSetAdaptator import TableSetAdaptator | |
36 from commons.core.sql.DbMySql import DbMySql | |
37 from commons.core.coord.Set import Set | |
38 | |
39 | |
40 class Test_TableSetAdaptator( unittest.TestCase ): | |
41 | |
42 def setUp( self ): | |
43 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() ) | |
44 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId ) | |
45 configF = open(self._configFileName, "w" ) | |
46 configF.write( "[repet_env]\n" ) | |
47 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) | |
48 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) | |
49 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) | |
50 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) | |
51 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) | |
52 configF.close() | |
53 self._iDb = DbMySql( cfgFileName=self._configFileName ) | |
54 self._table = "dummySetTable_%s" % ( self._uniqId ) | |
55 self._tSetA = TableSetAdaptator( self._iDb, self._table ) | |
56 | |
57 def tearDown( self ): | |
58 self._uniqId = None | |
59 self._iDb.dropTable( self._table ) | |
60 self._iDb.close() | |
61 self._table = None | |
62 self._tSetA = None | |
63 os.remove( self._configFileName ) | |
64 self._configFileName = "" | |
65 | |
66 def test_insert(self): | |
67 set2Insert = Set() | |
68 set2Insert.id = 1 | |
69 set2Insert.name = "name1" | |
70 set2Insert.seqname = "name2" | |
71 set2Insert.start = 1L | |
72 set2Insert.end = 50L | |
73 self._iDb.createTable( self._table, "set", "" ) | |
74 self._tSetA.insert( set2Insert, False ) | |
75 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
76 self._iDb.execute( sqlCmd ) | |
77 expTsetTuple = ((1, "name1", "name2", 1L, 50L),) | |
78 obsTsetTuples = self._iDb.cursor.fetchall() | |
79 self.assertEquals(expTsetTuple, obsTsetTuples ) | |
80 | |
81 def test_insertList ( self ): | |
82 self._iDb.createTable( self._table, "set", "" ) | |
83 set1 = Set() | |
84 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
85 set2 = Set() | |
86 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
87 lset = [ set1, set2 ] | |
88 self._tSetA.insertList( lset ) | |
89 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
90 self._iDb.execute( sqlCmd ) | |
91 expTsetTuple = ((1, "name1", "desc1", 1, 120), (2, "name2", "desc2", 1, 20)) | |
92 obsTsetTuples = self._iDb.cursor.fetchall() | |
93 self.assertEqual( expTsetTuple, obsTsetTuples ) | |
94 | |
95 def test_getIdList(self): | |
96 set2Insert = Set() | |
97 set2Insert.id = 1 | |
98 set2Insert.name = "name1" | |
99 set2Insert.seqname = "name2" | |
100 set2Insert.start = 1 | |
101 set2Insert.end = 50 | |
102 self._iDb.createTable( self._table, "set", "" ) | |
103 self._tSetA.insert( set2Insert ) | |
104 l = self._tSetA.getIdList() | |
105 self.assertEquals( set2Insert.id, l[0] ) | |
106 | |
107 def test_getSeqNameList(self): | |
108 self._iDb.createTable( self._table, "set", "" ) | |
109 set1 = Set() | |
110 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
111 set2 = Set() | |
112 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
113 set3 = Set() | |
114 set3.setFromString( "2\tname2\tdesc2\t1\t50\n" ) | |
115 for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) | |
116 lExp = ["desc1", "desc2"] | |
117 lObs = self._tSetA.getSeqNameList() | |
118 self.assertEqual( lObs, lExp ) | |
119 | |
120 def test_getSetListFromSeqName(self): | |
121 self._iDb.createTable( self._table, "set", "" ) | |
122 set1 = Set() | |
123 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
124 set2 = Set() | |
125 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
126 set3 = Set() | |
127 set3.setFromString( "3\tname2\tdesc2\t1\t50\n" ) | |
128 for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) | |
129 explSet = [Set( 2,"name2", "desc2", 1, 20), Set( 3,"name2", "desc2", 1, 50)] | |
130 obslSet = self._tSetA.getSetListFromSeqName("desc2") | |
131 self.assertEqual( explSet, obslSet ) | |
132 | |
133 def test_getSetListFromId(self): | |
134 self._iDb.createTable( self._table, "set", "" ) | |
135 set1 = Set() | |
136 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
137 set2 = Set() | |
138 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
139 lset = [ set1, set2 ] | |
140 self._tSetA.insertList( lset ) | |
141 explSet = [Set( 2,"name2", "desc2", 1, 20)] | |
142 obslSet = self._tSetA.getSetListFromId(2) | |
143 self.assertEqual( explSet, obslSet ) | |
144 | |
145 def test_getSetListFromIdList(self): | |
146 self._iDb.createTable( self._table, "set", "" ) | |
147 set1 = Set() | |
148 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
149 set2 = Set() | |
150 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
151 lset = [ set1, set2 ] | |
152 self._tSetA.insertList( lset ) | |
153 explSet = [Set( 1, "name1", "desc1", 1, 120), Set( 2,"name2", "desc2", 1, 20)] | |
154 lId = [1, 2] | |
155 obslSet = self._tSetA.getSetListFromIdList(lId) | |
156 self.assertEqual( explSet, obslSet ) | |
157 | |
158 def test_getSetListFromIdList_emptyList(self): | |
159 self._iDb.createTable( self._table, "set", "" ) | |
160 set1 = Set() | |
161 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
162 set2 = Set() | |
163 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
164 lset = [ set1, set2 ] | |
165 self._tSetA.insertList( lset ) | |
166 explSet = [] | |
167 lId = [] | |
168 obslSet = self._tSetA.getSetListFromIdList(lId) | |
169 self.assertEqual( explSet, obslSet ) | |
170 | |
171 def test_getSetListOverlappingCoord(self): | |
172 self._iDb.createTable( self._table, "set", "" ) | |
173 set1 = Set() | |
174 set1.setFromString( "1\tname1\tdesc2\t1\t120\n" ) | |
175 set2 = Set() | |
176 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
177 lset = [ set1, set2 ] | |
178 self._tSetA.insertList( lset ) | |
179 explSet = [Set( 1,"name1", "desc2", 1, 120)] | |
180 obslSet = self._tSetA.getSetListOverlappingCoord("desc2", 30, 70) | |
181 self.assertEqual( explSet, obslSet ) | |
182 | |
183 def test_deleteFromId(self): | |
184 self._iDb.createTable( self._table, "set", "" ) | |
185 set1 = Set() | |
186 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
187 set2 = Set() | |
188 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
189 set3 = Set() | |
190 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) | |
191 for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) | |
192 self._tSetA.deleteFromId(1) | |
193 expTSetTuples = (( 2,"name2", "desc2", 1, 20), ( 3,"name2", "desc3", 1, 50)) | |
194 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
195 self._iDb.execute( sqlCmd ) | |
196 obsTsetTuples = self._iDb.cursor.fetchall() | |
197 | |
198 self.assertEqual( expTSetTuples, obsTsetTuples ) | |
199 | |
200 def test_deleteFromIdList(self): | |
201 self._iDb.createTable( self._table, "set", "" ) | |
202 set1 = Set() | |
203 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
204 set2 = Set() | |
205 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
206 set3 = Set() | |
207 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) | |
208 for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) | |
209 lId2del = [1, 2] | |
210 self._tSetA.deleteFromIdList(lId2del) | |
211 expTSetTuples = (( 3,"name2", "desc3", 1, 50),) | |
212 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
213 self._iDb.execute( sqlCmd ) | |
214 obsTsetTuples = self._iDb.cursor.fetchall() | |
215 | |
216 self.assertEqual( expTSetTuples, obsTsetTuples ) | |
217 | |
218 def test_deleteFromIdListWithEmptyList(self): | |
219 self._iDb.createTable( self._table, "set", "" ) | |
220 set1 = Set() | |
221 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
222 set2 = Set() | |
223 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
224 set3 = Set() | |
225 set3.setFromString( "3\tname2\tdesc3\t1\t50\n" ) | |
226 for m in [ set1, set2, set3 ]: self._tSetA.insert( m ) | |
227 lId2del = [] | |
228 self._tSetA.deleteFromIdList(lId2del) | |
229 expTSetTuples = ((1L, 'name1', 'desc1', 1L, 120L), (2L, 'name2', 'desc2', 1L, 20L), (3L, 'name2', 'desc3', 1L, 50L)) | |
230 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
231 self._iDb.execute( sqlCmd ) | |
232 obsTsetTuples = self._iDb.cursor.fetchall() | |
233 | |
234 self.assertEqual( expTSetTuples, obsTsetTuples ) | |
235 | |
236 def test_joinTwoSets(self): | |
237 self._iDb.createTable( self._table, "set", "" ) | |
238 idSet1 = 5 | |
239 set1 = Set() | |
240 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) | |
241 idSet2 = 2 | |
242 set2 = Set() | |
243 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
244 lset = [ set1, set2 ] | |
245 self._tSetA.insertList( lset ) | |
246 self._tSetA.joinTwoSets(idSet1, idSet2) | |
247 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
248 self._iDb.execute( sqlCmd ) | |
249 | |
250 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L )) | |
251 obsTSetTuples = self._iDb.cursor.fetchall() | |
252 | |
253 self.assertEqual( expTSetTuples, obsTSetTuples) | |
254 self._iDb.dropTable(self._table) | |
255 | |
256 def test_joinTwoSetsWhereId1InfId2(self): | |
257 self._iDb.createTable( self._table, "set", "" ) | |
258 idSet1 = 2 | |
259 set1 = Set() | |
260 set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) | |
261 | |
262 idSet2 = 5 | |
263 set2 = Set() | |
264 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
265 | |
266 lset = [ set1, set2 ] | |
267 self._tSetA.insertList( lset ) | |
268 | |
269 self._tSetA.joinTwoSets(idSet1, idSet2) | |
270 | |
271 sqlCmd = "SELECT * FROM %s" % ( self._table ) | |
272 self._iDb.execute( sqlCmd ) | |
273 | |
274 expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L )) | |
275 obsTSetTuples = self._iDb.cursor.fetchall() | |
276 | |
277 self.assertEqual( expTSetTuples, obsTSetTuples) | |
278 self._iDb.dropTable(self._table) | |
279 | |
280 def test_getNewId(self): | |
281 self._iDb.createTable( self._table, "set", "" ) | |
282 set1 = Set() | |
283 set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) | |
284 set2 = Set() | |
285 set2.setFromString( "2\tname2\tdesc2\t1\t20\n" ) | |
286 set3 = Set() | |
287 set3.setFromString( "5\tname1\tdesc1\t1\t120\n" ) | |
288 set4 = Set() | |
289 set4.setFromString( "8\tname2\tdesc2\t1\t20\n" ) | |
290 lset = [ set1, set2, set3, set4 ] | |
291 self._tSetA.insertList( lset ) | |
292 expId = 9 | |
293 obsId = self._tSetA.getNewId() | |
294 self.assertEqual( expId, obsId) | |
295 self._iDb.dropTable(self._table) | |
296 | |
297 def test_getNewId_set_null(self): | |
298 self._iDb.createTable( self._table, "set", "" ) | |
299 set1 = Set() | |
300 lset = [ set1 ] | |
301 self._tSetA.insertList( lset ) | |
302 expId = 1 | |
303 obsId = self._tSetA.getNewId() | |
304 self.assertEqual( expId, obsId) | |
305 self._iDb.dropTable(self._table) | |
306 | |
307 def test_getListOfAllSets( self ): | |
308 self._iDb.createTable( self._table, "set" ) | |
309 s1 = Set() | |
310 s1.setFromString( "1\tchr1\tTE3\t1\t10\n" ) | |
311 s2a = Set() | |
312 s2a.setFromString( "2\tchr1\tTE2\t2\t9\n" ) | |
313 s2b = Set() | |
314 s2b.setFromString( "2\tchr1\tTE2\t12\t19\n" ) | |
315 lSets = [ s1, s2a, s2b ] | |
316 self._tSetA.insertList( lSets ) | |
317 expLSets = [ s1, s2a, s2b ] | |
318 obsLSets = self._tSetA.getListOfAllSets() | |
319 self.assertEqual( expLSets, obsLSets ) | |
320 | |
321 def test_getListOfAllSets_empty_table( self ): | |
322 self._iDb.createTable( self._table, "set" ) | |
323 expList = [] | |
324 obsList = self._tSetA.getListOfAllSets() | |
325 self.assertEqual( expList, obsList ) | |
326 | |
327 test_suite = unittest.TestSuite() | |
328 test_suite.addTest( unittest.makeSuite( Test_TableSetAdaptator ) ) | |
329 if __name__ == "__main__": | |
330 unittest.TextTestRunner(verbosity=2).run( test_suite ) |