6
|
1 import os
|
|
2 import time
|
|
3 import sys
|
|
4 import stat
|
|
5 import unittest
|
|
6 import glob
|
|
7 from commons.core.sql.DbMySql import DbMySql
|
|
8 from commons.core.sql.RepetJob import RepetJob
|
|
9 from commons.core.sql.Job import Job
|
|
10
|
|
11 class Test_F_RepetJob(unittest.TestCase):
|
|
12
|
|
13 def setUp(self):
|
|
14 self._jobTableName = "dummyJobTable"
|
|
15 self._db = DbMySql()
|
|
16 self._iRepetJob = RepetJob()
|
|
17 self._configFileName = "dummyConfigFile"
|
|
18 configF = open(self._configFileName, "w" )
|
|
19 configF.write( "[repet_env]\n" )
|
|
20 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
|
|
21 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
|
|
22 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
|
|
23 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
|
|
24 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
|
|
25 configF.close()
|
|
26
|
|
27 def tearDown(self):
|
|
28 self._iRepetJob = None
|
|
29 self._db.dropTable( self._jobTableName )
|
|
30 self._db.close()
|
|
31 os.remove(self._configFileName)
|
|
32
|
|
33 def test_submitJob_with_multiple_jobs(self):
|
|
34 job1 = self._createJobInstance("job1")
|
|
35 self._createLauncherFile(job1)
|
|
36
|
|
37 job2 = self._createJobInstance("job2")
|
|
38 self._createLauncherFile(job2)
|
|
39
|
|
40 job3 = self._createJobInstance("job3")
|
|
41 self._createLauncherFile(job3)
|
|
42
|
|
43 self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
44 self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
45 self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
46
|
|
47 time.sleep(70)
|
|
48
|
|
49 expJobStatus = "finished"
|
|
50 obsJobStatus1 = self._iRepetJob.getJobStatus(job1)
|
|
51 obsJobStatus2 = self._iRepetJob.getJobStatus(job2)
|
|
52 obsJobStatus3 = self._iRepetJob.getJobStatus(job3)
|
|
53
|
|
54 self.assertEquals(expJobStatus, obsJobStatus1)
|
|
55 self.assertEquals(expJobStatus, obsJobStatus2)
|
|
56 self.assertEquals(expJobStatus, obsJobStatus3)
|
|
57
|
|
58 jobName1 = job1.jobname
|
|
59 jobName2 = job2.jobname
|
|
60 jobName3 = job3.jobname
|
|
61
|
|
62 expErrorFilePrefix1 = jobName1+ ".e"
|
|
63 expOutputFilePrefix1 = jobName1 + ".o"
|
|
64 expErrorFilePrefix2 = jobName2 + ".e"
|
|
65 expOutputFilePrefix2 = jobName2 + ".o"
|
|
66 expErrorFilePrefix3 = jobName3 + ".e"
|
|
67 expOutputFilePrefix3 = jobName3 + ".o"
|
|
68
|
|
69 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
|
|
70 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
|
|
71 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
|
|
72 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
|
|
73 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
|
|
74 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
|
|
75
|
|
76 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
|
|
77 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
|
|
78 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0)
|
|
79 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
|
|
80 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0)
|
|
81 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
|
|
82
|
|
83 os.system("rm launcherFileTest*.py *.e* *.o*")
|
|
84 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
|
|
85 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
|
|
86 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
|
|
87
|
|
88 def test_submitJob_job_already_submitted(self):
|
|
89 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
90 iJob = self._createJobInstance("job")
|
|
91 self._iRepetJob.recordJob(iJob)
|
|
92
|
|
93 isSysExitRaised = False
|
|
94 try:
|
|
95 self._iRepetJob.submitJob(iJob)
|
|
96 except SystemExit:
|
|
97 isSysExitRaised = True
|
|
98 self.assertTrue(isSysExitRaised)
|
|
99
|
|
100 def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
|
|
101 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
102 iJob = self._createJobInstance("job")
|
|
103 self._createLauncherFile(iJob)
|
|
104 self._iRepetJob.recordJob(iJob)
|
|
105 self._iRepetJob.changeJobStatus(iJob, "running", "method")
|
|
106
|
|
107 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid )
|
|
108
|
|
109 obsError = "obsError.txt"
|
|
110 obsErrorHandler = open(obsError, "w")
|
|
111 stderrRef = sys.stderr
|
|
112 sys.stderr = obsErrorHandler
|
|
113
|
|
114 isSysExitRaised = False
|
|
115 try:
|
|
116 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3)
|
|
117 except SystemExit:
|
|
118 isSysExitRaised = True
|
|
119
|
|
120 obsErrorHandler.close()
|
|
121
|
|
122 obsErrorHandler = open(obsError, "r")
|
|
123 obsMsg = obsErrorHandler.readline()
|
|
124 obsErrorHandler.close()
|
|
125
|
|
126 sys.stderr = stderrRef
|
|
127 os.remove(obsError)
|
|
128 os.system("rm launcherFileTest*.py")
|
|
129 self.assertTrue(isSysExitRaised)
|
|
130 self.assertEquals(expMsg, obsMsg)
|
|
131
|
|
132 def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
|
|
133 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
134 iJob = self._createJobInstance("job")
|
|
135 self._createLauncherFile(iJob)
|
|
136
|
|
137 self._iRepetJob.recordJob(iJob)
|
|
138 self._iRepetJob.changeJobStatus(iJob, "error", "method")
|
|
139
|
|
140 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2)
|
|
141
|
|
142 time.sleep(10)
|
|
143
|
|
144 expJobStatus = "finished"
|
|
145 obsJobStatus1 = self._iRepetJob.getJobStatus(iJob)
|
|
146
|
|
147 self.assertEquals(expJobStatus, obsJobStatus1)
|
|
148
|
|
149 jobName = iJob.jobname
|
|
150
|
|
151 expErrorFilePrefix1 = jobName + ".e"
|
|
152 expOutputFilePrefix1 = jobName + ".o"
|
|
153
|
|
154 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
|
|
155 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
|
|
156
|
|
157 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
|
|
158 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
|
|
159
|
|
160 self._iRepetJob.removeJob(iJob)
|
|
161 os.system("rm launcherFileTest*.py *.e* *.o*")
|
|
162 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
|
|
163
|
|
164
|
|
165 def test_isJobStillHandledBySge_True(self):
|
|
166 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
167 iJob = self._createJobInstance("job")
|
|
168 self._createLauncherFile(iJob)
|
|
169 self._iRepetJob.submitJob(iJob)
|
|
170
|
|
171 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
|
|
172 os.system("rm launcherFileTest*.py")
|
|
173
|
|
174 self.assertTrue(isJobHandledBySge)
|
|
175
|
|
176 def test_isJobStillHandledBySge_False(self):
|
|
177 self._iRepetJob.createTable(self._jobTableName, "jobs")
|
|
178 iJob = self._createJobInstance("job")
|
|
179 self._createLauncherFile(iJob)
|
|
180 self._iRepetJob.recordJob(iJob)
|
|
181
|
|
182 isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
|
|
183 os.system("rm launcherFileTest*.py")
|
|
184
|
|
185 self.assertFalse(isJobHandledBySge)
|
|
186
|
|
187 def _createJobInstance(self, name):
|
|
188 return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py")
|
|
189
|
|
190 def _createLauncherFile(self, iJob):
|
|
191 jobFileHandler = open( iJob.launcher , "w" )
|
|
192
|
|
193 launcher = "#!/usr/bin/python\n"
|
|
194 launcher += "import os\n"
|
|
195 launcher += "import sys\n"
|
|
196
|
|
197 launcher += "print \"system:\", os.uname()\n"
|
|
198 launcher += "sys.stdout.flush()\n"
|
|
199 newStatus = "running"
|
|
200 prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"])
|
|
201 cmd = prg
|
|
202 cmd += " -t %s" % ( iJob.tablename )
|
|
203 cmd += " -n %s" % ( iJob.jobname )
|
|
204 cmd += " -g %s" % ( iJob.groupid )
|
|
205 if iJob.queue != "":
|
|
206 cmd += " -q %s" % ( iJob.queue )
|
|
207 cmd += " -s %s" % ( newStatus )
|
|
208 cmd += " -c %s" %( self._configFileName )
|
|
209 cmd += " -v 1"
|
|
210 launcher +="os.system( \"" + cmd + "\" )\n"
|
|
211
|
|
212 launcher += "print \"LAUNCH: "+ iJob.command + "\"\n"
|
|
213 launcher += "sys.stdout.flush()\n"
|
|
214 launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
|
|
215 launcher += "if exitStatus != 0:\n"
|
|
216 launcher += "\tprint \"ERROR: "+ iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
|
|
217
|
|
218 newStatus = "finished"
|
|
219 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
|
|
220 cmd = prg
|
|
221 cmd += " -t %s" % ( iJob.tablename )
|
|
222 cmd += " -n %s" % ( iJob.jobname )
|
|
223 cmd += " -g %s" % ( iJob.groupid )
|
|
224 if iJob.queue != "":
|
|
225 cmd += " -q %s" % ( iJob.queue )
|
|
226 cmd += " -s %s" % ( newStatus )
|
|
227 cmd += " -c %s" %( self._configFileName )
|
|
228 cmd += " -v 1"
|
|
229 launcher +="os.system( \"" + cmd + "\" )\n"
|
|
230 launcher += "sys.exit(0)\n"
|
|
231 jobFileHandler.write(launcher)
|
|
232 jobFileHandler.close()
|
|
233 os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC )
|
|
234
|
|
235 if __name__ == "__main__":
|
|
236 unittest.main() |