comparison commons/core/sql/test/Tst_F_RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 import os
2 import time
3 import sys
4 import stat
5 import unittest
6 import glob
7 from commons.core.sql.DbMySql import DbMySql
8 from commons.core.sql.RepetJob import RepetJob
9 from commons.core.sql.Job import Job
10
11 class Test_F_RepetJob(unittest.TestCase):
12
13 def setUp(self):
14 self._jobTableName = "dummyJobTable"
15 self._db = DbMySql()
16 self._iRepetJob = RepetJob()
17 self._configFileName = "dummyConfigFile"
18 configF = open(self._configFileName, "w" )
19 configF.write( "[repet_env]\n" )
20 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
21 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
22 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
23 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
24 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
25 configF.close()
26
27 def tearDown(self):
28 self._iRepetJob = None
29 self._db.dropTable( self._jobTableName )
30 self._db.close()
31 os.remove(self._configFileName)
32
33 def test_submitJob_with_multiple_jobs(self):
34 job1 = self._createJobInstance("job1")
35 self._createLauncherFile(job1)
36
37 job2 = self._createJobInstance("job2")
38 self._createLauncherFile(job2)
39
40 job3 = self._createJobInstance("job3")
41 self._createLauncherFile(job3)
42
43 self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
44 self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
45 self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
46
47 time.sleep(70)
48
49 expJobStatus = "finished"
50 obsJobStatus1 = self._iRepetJob.getJobStatus(job1)
51 obsJobStatus2 = self._iRepetJob.getJobStatus(job2)
52 obsJobStatus3 = self._iRepetJob.getJobStatus(job3)
53
54 self.assertEquals(expJobStatus, obsJobStatus1)
55 self.assertEquals(expJobStatus, obsJobStatus2)
56 self.assertEquals(expJobStatus, obsJobStatus3)
57
58 jobName1 = job1.jobname
59 jobName2 = job2.jobname
60 jobName3 = job3.jobname
61
62 expErrorFilePrefix1 = jobName1+ ".e"
63 expOutputFilePrefix1 = jobName1 + ".o"
64 expErrorFilePrefix2 = jobName2 + ".e"
65 expOutputFilePrefix2 = jobName2 + ".o"
66 expErrorFilePrefix3 = jobName3 + ".e"
67 expOutputFilePrefix3 = jobName3 + ".o"
68
69 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
70 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
71 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
72 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
73 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
74 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
75
76 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
77 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
78 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0)
79 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
80 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0)
81 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
82
83 os.system("rm launcherFileTest*.py *.e* *.o*")
84 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
85 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
86 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
87
88 def test_submitJob_job_already_submitted(self):
89 self._iRepetJob.createTable(self._jobTableName, "jobs")
90 iJob = self._createJobInstance("job")
91 self._iRepetJob.recordJob(iJob)
92
93 isSysExitRaised = False
94 try:
95 self._iRepetJob.submitJob(iJob)
96 except SystemExit:
97 isSysExitRaised = True
98 self.assertTrue(isSysExitRaised)
99
100 def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
101 self._iRepetJob.createTable(self._jobTableName, "jobs")
102 iJob = self._createJobInstance("job")
103 self._createLauncherFile(iJob)
104 self._iRepetJob.recordJob(iJob)
105 self._iRepetJob.changeJobStatus(iJob, "running", "method")
106
107 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid )
108
109 obsError = "obsError.txt"
110 obsErrorHandler = open(obsError, "w")
111 stderrRef = sys.stderr
112 sys.stderr = obsErrorHandler
113
114 isSysExitRaised = False
115 try:
116 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3)
117 except SystemExit:
118 isSysExitRaised = True
119
120 obsErrorHandler.close()
121
122 obsErrorHandler = open(obsError, "r")
123 obsMsg = obsErrorHandler.readline()
124 obsErrorHandler.close()
125
126 sys.stderr = stderrRef
127 os.remove(obsError)
128 os.system("rm launcherFileTest*.py")
129 self.assertTrue(isSysExitRaised)
130 self.assertEquals(expMsg, obsMsg)
131
132 def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
133 self._iRepetJob.createTable(self._jobTableName, "jobs")
134 iJob = self._createJobInstance("job")
135 self._createLauncherFile(iJob)
136
137 self._iRepetJob.recordJob(iJob)
138 self._iRepetJob.changeJobStatus(iJob, "error", "method")
139
140 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2)
141
142 time.sleep(10)
143
144 expJobStatus = "finished"
145 obsJobStatus1 = self._iRepetJob.getJobStatus(iJob)
146
147 self.assertEquals(expJobStatus, obsJobStatus1)
148
149 jobName = iJob.jobname
150
151 expErrorFilePrefix1 = jobName + ".e"
152 expOutputFilePrefix1 = jobName + ".o"
153
154 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
155 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
156
157 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
158 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
159
160 self._iRepetJob.removeJob(iJob)
161 os.system("rm launcherFileTest*.py *.e* *.o*")
162 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
163
164
165 def test_isJobStillHandledBySge_True(self):
166 self._iRepetJob.createTable(self._jobTableName, "jobs")
167 iJob = self._createJobInstance("job")
168 self._createLauncherFile(iJob)
169 self._iRepetJob.submitJob(iJob)
170
171 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
172 os.system("rm launcherFileTest*.py")
173
174 self.assertTrue(isJobHandledBySge)
175
176 def test_isJobStillHandledBySge_False(self):
177 self._iRepetJob.createTable(self._jobTableName, "jobs")
178 iJob = self._createJobInstance("job")
179 self._createLauncherFile(iJob)
180 self._iRepetJob.recordJob(iJob)
181
182 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
183 os.system("rm launcherFileTest*.py")
184
185 self.assertFalse(isJobHandledBySge)
186
187 def _createJobInstance(self, name):
188 return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py")
189
190 def _createLauncherFile(self, iJob):
191 jobFileHandler = open( iJob.launcher , "w" )
192
193 launcher = "#!/usr/bin/python\n"
194 launcher += "import os\n"
195 launcher += "import sys\n"
196
197 launcher += "print \"system:\", os.uname()\n"
198 launcher += "sys.stdout.flush()\n"
199 newStatus = "running"
200 prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"])
201 cmd = prg
202 cmd += " -t %s" % ( iJob.tablename )
203 cmd += " -n %s" % ( iJob.jobname )
204 cmd += " -g %s" % ( iJob.groupid )
205 if iJob.queue != "":
206 cmd += " -q %s" % ( iJob.queue )
207 cmd += " -s %s" % ( newStatus )
208 cmd += " -c %s" %( self._configFileName )
209 cmd += " -v 1"
210 launcher +="os.system( \"" + cmd + "\" )\n"
211
212 launcher += "print \"LAUNCH: "+ iJob.command + "\"\n"
213 launcher += "sys.stdout.flush()\n"
214 launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
215 launcher += "if exitStatus != 0:\n"
216 launcher += "\tprint \"ERROR: "+ iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
217
218 newStatus = "finished"
219 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
220 cmd = prg
221 cmd += " -t %s" % ( iJob.tablename )
222 cmd += " -n %s" % ( iJob.jobname )
223 cmd += " -g %s" % ( iJob.groupid )
224 if iJob.queue != "":
225 cmd += " -q %s" % ( iJob.queue )
226 cmd += " -s %s" % ( newStatus )
227 cmd += " -c %s" %( self._configFileName )
228 cmd += " -v 1"
229 launcher +="os.system( \"" + cmd + "\" )\n"
230 launcher += "sys.exit(0)\n"
231 jobFileHandler.write(launcher)
232 jobFileHandler.close()
233 os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC )
234
235 if __name__ == "__main__":
236 unittest.main()