Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 import os | |
2 import time | |
3 import sys | |
4 import stat | |
5 import unittest | |
6 import glob | |
7 from commons.core.sql.DbMySql import DbMySql | |
8 from commons.core.sql.RepetJob import RepetJob | |
9 from commons.core.sql.Job import Job | |
10 | |
11 class Test_F_RepetJob(unittest.TestCase): | |
12 | |
13 def setUp(self): | |
14 self._jobTableName = "dummyJobTable" | |
15 self._db = DbMySql() | |
16 self._iRepetJob = RepetJob() | |
17 self._configFileName = "dummyConfigFile" | |
18 configF = open(self._configFileName, "w" ) | |
19 configF.write( "[repet_env]\n" ) | |
20 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) | |
21 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) | |
22 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) | |
23 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) | |
24 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) | |
25 configF.close() | |
26 | |
27 def tearDown(self): | |
28 self._iRepetJob = None | |
29 self._db.dropTable( self._jobTableName ) | |
30 self._db.close() | |
31 os.remove(self._configFileName) | |
32 | |
33 def test_submitJob_with_multiple_jobs(self): | |
34 job1 = self._createJobInstance("job1") | |
35 self._createLauncherFile(job1) | |
36 | |
37 job2 = self._createJobInstance("job2") | |
38 self._createLauncherFile(job2) | |
39 | |
40 job3 = self._createJobInstance("job3") | |
41 self._createLauncherFile(job3) | |
42 | |
43 self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
44 self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
45 self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
46 | |
47 time.sleep(70) | |
48 | |
49 expJobStatus = "finished" | |
50 obsJobStatus1 = self._iRepetJob.getJobStatus(job1) | |
51 obsJobStatus2 = self._iRepetJob.getJobStatus(job2) | |
52 obsJobStatus3 = self._iRepetJob.getJobStatus(job3) | |
53 | |
54 self.assertEquals(expJobStatus, obsJobStatus1) | |
55 self.assertEquals(expJobStatus, obsJobStatus2) | |
56 self.assertEquals(expJobStatus, obsJobStatus3) | |
57 | |
58 jobName1 = job1.jobname | |
59 jobName2 = job2.jobname | |
60 jobName3 = job3.jobname | |
61 | |
62 expErrorFilePrefix1 = jobName1+ ".e" | |
63 expOutputFilePrefix1 = jobName1 + ".o" | |
64 expErrorFilePrefix2 = jobName2 + ".e" | |
65 expOutputFilePrefix2 = jobName2 + ".o" | |
66 expErrorFilePrefix3 = jobName3 + ".e" | |
67 expOutputFilePrefix3 = jobName3 + ".o" | |
68 | |
69 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") | |
70 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") | |
71 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") | |
72 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") | |
73 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") | |
74 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") | |
75 | |
76 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) | |
77 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) | |
78 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) | |
79 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) | |
80 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) | |
81 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) | |
82 | |
83 os.system("rm launcherFileTest*.py *.e* *.o*") | |
84 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) | |
85 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) | |
86 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) | |
87 | |
88 def test_submitJob_job_already_submitted(self): | |
89 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
90 iJob = self._createJobInstance("job") | |
91 self._iRepetJob.recordJob(iJob) | |
92 | |
93 isSysExitRaised = False | |
94 try: | |
95 self._iRepetJob.submitJob(iJob) | |
96 except SystemExit: | |
97 isSysExitRaised = True | |
98 self.assertTrue(isSysExitRaised) | |
99 | |
100 def test_waitJobGroup_with_several_nbTimeOut_waiting(self): | |
101 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
102 iJob = self._createJobInstance("job") | |
103 self._createLauncherFile(iJob) | |
104 self._iRepetJob.recordJob(iJob) | |
105 self._iRepetJob.changeJobStatus(iJob, "running", "method") | |
106 | |
107 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid ) | |
108 | |
109 obsError = "obsError.txt" | |
110 obsErrorHandler = open(obsError, "w") | |
111 stderrRef = sys.stderr | |
112 sys.stderr = obsErrorHandler | |
113 | |
114 isSysExitRaised = False | |
115 try: | |
116 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3) | |
117 except SystemExit: | |
118 isSysExitRaised = True | |
119 | |
120 obsErrorHandler.close() | |
121 | |
122 obsErrorHandler = open(obsError, "r") | |
123 obsMsg = obsErrorHandler.readline() | |
124 obsErrorHandler.close() | |
125 | |
126 sys.stderr = stderrRef | |
127 os.remove(obsError) | |
128 os.system("rm launcherFileTest*.py") | |
129 self.assertTrue(isSysExitRaised) | |
130 self.assertEquals(expMsg, obsMsg) | |
131 | |
132 def test_waitJobGroup_with_error_job_maxRelaunch_two(self): | |
133 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
134 iJob = self._createJobInstance("job") | |
135 self._createLauncherFile(iJob) | |
136 | |
137 self._iRepetJob.recordJob(iJob) | |
138 self._iRepetJob.changeJobStatus(iJob, "error", "method") | |
139 | |
140 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2) | |
141 | |
142 time.sleep(10) | |
143 | |
144 expJobStatus = "finished" | |
145 obsJobStatus1 = self._iRepetJob.getJobStatus(iJob) | |
146 | |
147 self.assertEquals(expJobStatus, obsJobStatus1) | |
148 | |
149 jobName = iJob.jobname | |
150 | |
151 expErrorFilePrefix1 = jobName + ".e" | |
152 expOutputFilePrefix1 = jobName + ".o" | |
153 | |
154 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") | |
155 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") | |
156 | |
157 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) | |
158 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) | |
159 | |
160 self._iRepetJob.removeJob(iJob) | |
161 os.system("rm launcherFileTest*.py *.e* *.o*") | |
162 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) | |
163 | |
164 | |
165 def test_isJobStillHandledBySge_True(self): | |
166 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
167 iJob = self._createJobInstance("job") | |
168 self._createLauncherFile(iJob) | |
169 self._iRepetJob.submitJob(iJob) | |
170 | |
171 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) | |
172 os.system("rm launcherFileTest*.py") | |
173 | |
174 self.assertTrue(isJobHandledBySge) | |
175 | |
176 def test_isJobStillHandledBySge_False(self): | |
177 self._iRepetJob.createTable(self._jobTableName, "jobs") | |
178 iJob = self._createJobInstance("job") | |
179 self._createLauncherFile(iJob) | |
180 self._iRepetJob.recordJob(iJob) | |
181 | |
182 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) | |
183 os.system("rm launcherFileTest*.py") | |
184 | |
185 self.assertFalse(isJobHandledBySge) | |
186 | |
187 def _createJobInstance(self, name): | |
188 return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py") | |
189 | |
190 def _createLauncherFile(self, iJob): | |
191 jobFileHandler = open( iJob.launcher , "w" ) | |
192 | |
193 launcher = "#!/usr/bin/python\n" | |
194 launcher += "import os\n" | |
195 launcher += "import sys\n" | |
196 | |
197 launcher += "print \"system:\", os.uname()\n" | |
198 launcher += "sys.stdout.flush()\n" | |
199 newStatus = "running" | |
200 prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"]) | |
201 cmd = prg | |
202 cmd += " -t %s" % ( iJob.tablename ) | |
203 cmd += " -n %s" % ( iJob.jobname ) | |
204 cmd += " -g %s" % ( iJob.groupid ) | |
205 if iJob.queue != "": | |
206 cmd += " -q %s" % ( iJob.queue ) | |
207 cmd += " -s %s" % ( newStatus ) | |
208 cmd += " -c %s" %( self._configFileName ) | |
209 cmd += " -v 1" | |
210 launcher +="os.system( \"" + cmd + "\" )\n" | |
211 | |
212 launcher += "print \"LAUNCH: "+ iJob.command + "\"\n" | |
213 launcher += "sys.stdout.flush()\n" | |
214 launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" | |
215 launcher += "if exitStatus != 0:\n" | |
216 launcher += "\tprint \"ERROR: "+ iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" | |
217 | |
218 newStatus = "finished" | |
219 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" | |
220 cmd = prg | |
221 cmd += " -t %s" % ( iJob.tablename ) | |
222 cmd += " -n %s" % ( iJob.jobname ) | |
223 cmd += " -g %s" % ( iJob.groupid ) | |
224 if iJob.queue != "": | |
225 cmd += " -q %s" % ( iJob.queue ) | |
226 cmd += " -s %s" % ( newStatus ) | |
227 cmd += " -c %s" %( self._configFileName ) | |
228 cmd += " -v 1" | |
229 launcher +="os.system( \"" + cmd + "\" )\n" | |
230 launcher += "sys.exit(0)\n" | |
231 jobFileHandler.write(launcher) | |
232 jobFileHandler.close() | |
233 os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC ) | |
234 | |
235 if __name__ == "__main__": | |
236 unittest.main() |