18
|
1 import unittest
|
|
2 import time
|
|
3 import os
|
|
4 from commons.core.sql.TablePathAdaptator import TablePathAdaptator
|
|
5 from commons.pyRepetUnit.convCoord.PathChunkConnector import PathChunkConnector
|
|
6 from commons.core.sql.DbMySql import DbMySql
|
|
7
|
|
8
|
|
9 class Test_PathChunkConnector( unittest.TestCase ):
|
|
10
|
|
11 def setUp(self):
|
|
12 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
|
|
13 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
|
|
14 configF = open(self._configFileName, "w" )
|
|
15 configF.write( "[repet_env]\n" )
|
|
16 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
|
|
17 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
|
|
18 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
|
|
19 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
|
|
20 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
|
|
21 configF.close()
|
|
22 self._db = DbMySql( cfgFileName=self._configFileName )
|
|
23 self._table = "dummyPathTable_%s" % ( self._uniqId )
|
|
24 self._tpA = TablePathAdaptator( self._db, self._table )
|
|
25 self._mapDict = {'chunk1': ('dmel_chr4', 1, 100000),'chunk2': ('dmel_chr4', 90001, 190000),'chunk3': ('dmel_chr4', 180001, 280000), }
|
|
26 self._mapFileName = "map_file.map"
|
|
27
|
|
28 def tearDown(self):
|
|
29 self._db.close()
|
|
30
|
|
31 def testTwoQueryOverlapsOnPlusStrand (self):
|
|
32 lines = [
|
|
33 "1\tdmel_chr4\t95535\t95570\tsbj2\t125423\t125467\t7e-15\t82\t97.78\n",
|
|
34 "2\tdmel_chr4\t95545\t95576\tsbj2\t125457\t133465\t2e-38\t83\t65\n"
|
|
35 ]
|
|
36
|
|
37 expectedList =[("dmel_chr4", 95535, 95576, "sbj2", 125423, 133465, 2e-38, 83,97.78)]
|
|
38 self._templateTest(lines, expectedList)
|
|
39
|
|
40 def testTwoQueryOverlapsOnReverseStrand (self):
|
|
41 lines = [
|
|
42 "1\tdmel_chr4\t95535\t95570\tsbj2\t125467\t125423\t7e-15\t82\t97.78\n",
|
|
43 "2\tdmel_chr4\t95545\t95576\tsbj2\t133465\t125457\t2e-38\t83\t65\n"
|
|
44 ]
|
|
45
|
|
46 expectedList =[("dmel_chr4", 95535, 95576, "sbj2", 133465, 125423, 2e-38, 83,97.78)]
|
|
47 self._templateTest(lines, expectedList)
|
|
48
|
|
49 def testTwoQueryOverlapsOnDifferentStrands (self):
|
|
50 lines = [
|
|
51 "1\tdmel_chr4\t95535\t95570\tsbj2\t125423\t125467\t7e-15\t82\t97.78\n",
|
|
52 "2\tdmel_chr4\t95545\t95576\tsbj2\t133465\t125457\t2e-38\t83\t65\n"
|
|
53 ]
|
|
54 expectedList =[
|
|
55 ("dmel_chr4", 95535, 95570, "sbj2", 125423, 125467, 7e-15, 82,97.78),
|
|
56 ("dmel_chr4", 95545, 95576, "sbj2", 133465, 125457, 2e-38, 83,65)
|
|
57 ]
|
|
58 self._templateTest(lines, expectedList)
|
|
59
|
|
60 def _templateTest(self, datas2TestList, expectedList ):
|
|
61 pathFileName = "dummyPathFile_%s" % ( self._uniqId )
|
|
62 _MockPathFile(pathFileName, datas2TestList)
|
|
63
|
|
64
|
|
65 self._db.createTable( self._table, "path", pathFileName )
|
|
66
|
|
67 _MockMapFile(self._mapFileName)
|
|
68 chunkConnector = PathChunkConnector(self._mapFileName, self._db, self._table, 0)
|
|
69 chunkConnector.run()
|
|
70
|
|
71 sql_cmd = 'select * from %s' % (self._table)
|
|
72 self._db.execute(sql_cmd)
|
|
73 res = self._db.fetchall()
|
|
74
|
|
75 for i in xrange(len(expectedList)):
|
|
76 resultTuple = res[i]
|
|
77 expectedTuple = expectedList[i]
|
|
78 self._assertExpectedTupleEqualsObsTuple(expectedTuple, resultTuple)
|
|
79
|
|
80 self._db.dropTable( self._table )
|
|
81 os.remove(pathFileName)
|
|
82 os.remove(self._mapFileName)
|
|
83
|
|
84 def _assertExpectedTupleEqualsObsTuple(self, expectedTuple, resultTuple):
|
|
85 self.assertEquals(expectedTuple[0], resultTuple[1])
|
|
86 self.assertEquals(expectedTuple[1], resultTuple[2])
|
|
87 self.assertEquals(expectedTuple[2], resultTuple[3])
|
|
88 self.assertEquals(expectedTuple[3], resultTuple[4])
|
|
89 self.assertEquals(expectedTuple[4], resultTuple[5])
|
|
90 self.assertEquals(expectedTuple[5], resultTuple[6])
|
|
91 self.assertEquals(expectedTuple[6], resultTuple[7])
|
|
92 self.assertEquals(expectedTuple[7], resultTuple[8])
|
|
93 self.assertEquals(expectedTuple[8], resultTuple[9])
|
|
94
|
|
95
|
|
96 class _MockPathFile:
|
|
97
|
|
98 def __init__(self, fileName, lines):
|
|
99 path = open(fileName, "w");
|
|
100 for line in lines:
|
|
101 path.write(line)
|
|
102 path.close
|
|
103
|
|
104
|
|
105 class _MockMapFile:
|
|
106
|
|
107 def __init__ (self, fileName):
|
|
108 map = open(fileName, "w")
|
|
109 line1 = "chunk1" + '\t' + "dmel_chr4" + '\t' + "1" + '\t'+ "100000" + "\n"
|
|
110 line2 = "chunk2" + '\t' + "dmel_chr4" + '\t' + "90001" + '\t'+ "190000" + "\n"
|
|
111 line3 = "chunk3" + '\t' + "dmel_chr4" + '\t' + "180001" + '\t' + "280000" + "\n"
|
|
112 map.write(line1)
|
|
113 map.write(line2)
|
|
114 map.write(line3)
|
|
115 map.close
|
|
116
|
|
117
|
|
118 test_suite = unittest.TestSuite()
|
|
119 test_suite.addTest( unittest.makeSuite( Test_PathChunkConnector ) )
|
|
120 if __name__ == "__main__":
|
|
121 unittest.TextTestRunner(verbosity=2).run( test_suite )
|