comparison commons/core/sql/test/Tst_RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 import unittest
2 import sys
3 import os
4 import time
5 from commons.core.sql.DbMySql import DbMySql
6 from commons.core.sql.Job import Job
7 from commons.core.sql.RepetJob import RepetJob
8 from commons.core.utils.FileUtils import FileUtils
9
10 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
11 class Test_RepetJob( unittest.TestCase ):
12
13 def setUp(self):
14 self._jobTableName = "dummyJobTable"
15 self._db = DbMySql()
16 self._iRepetJob = RepetJob()
17
18 def tearDown(self):
19 self._iRepetJob = None
20 self._db.close()
21
22 def _createJobInstance(self):
23 return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
24
25 def test_createJobTable_is_table_created(self):
26 self._iRepetJob.createTable(self._jobTableName, "jobs")
27
28 isTableCreated = self._db.doesTableExist(self._jobTableName)
29 self.assertTrue(isTableCreated)
30
31 self._db.dropTable(self._jobTableName)
32
33 def test_createJobTable_field_list(self):
34 self._iRepetJob.createTable(self._jobTableName, "jobs")
35
36 obsLFiled = self._db.getFieldList(self._jobTableName)
37 expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"]
38
39 self.assertEquals(expLField, obsLFiled)
40
41 self._db.dropTable(self._jobTableName)
42
43 def test_recordJob(self):
44 self._iRepetJob.createTable(self._jobTableName, "jobs")
45 iJob = self._createJobInstance()
46 self._iRepetJob.recordJob(iJob)
47
48 qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s"
49 params = (iJob.jobid)
50
51 self._db.execute(qryParams, params)
52
53 tObs = self._db.fetchall()[0]
54 tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?")
55
56 self.assertEquals(tExp,tObs)
57
58 self._db.dropTable(self._jobTableName)
59
60 def test_removeJob(self):
61 self._iRepetJob.createTable(self._jobTableName, "jobs")
62 iJob = self._createJobInstance()
63 self._iRepetJob.recordJob(iJob)
64
65 self._iRepetJob.removeJob(iJob)
66
67 isTableEmpty = self._db.isEmpty(self._jobTableName)
68
69 self.assertTrue(isTableEmpty)
70
71 self._db.dropTable(self._jobTableName)
72
73 def test_getJobStatus(self):
74 self._iRepetJob.createTable(self._jobTableName, "jobs")
75 iJob = self._createJobInstance()
76 self._iRepetJob.recordJob(iJob)
77
78 expStatus = "waiting"
79 obsStatus = self._iRepetJob.getJobStatus(iJob)
80
81 self.assertEquals(expStatus, obsStatus)
82 self._db.dropTable(self._jobTableName)
83
84 def test_getJobStatus_unknown(self):
85 self._iRepetJob.createTable(self._jobTableName, "jobs")
86 iJob = self._createJobInstance()
87
88 expStatus = "unknown"
89 obsStatus = self._iRepetJob.getJobStatus(iJob)
90
91 self.assertEquals(expStatus, obsStatus)
92 self._db.dropTable(self._jobTableName)
93
94 def test_getJobStatus_no_name(self):
95 self._iRepetJob.createTable(self._jobTableName, "jobs")
96 iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
97
98 expStatus = "unknown"
99 obsStatus = self._iRepetJob.getJobStatus(iJob)
100
101 self.assertEquals(expStatus, obsStatus)
102 self._db.dropTable(self._jobTableName)
103
104 def test_getJobStatus_non_unique_job(self):
105 # Warning : this case will not append, because recordJob() begin by removeJob()
106 self._iRepetJob.createTable(self._jobTableName, "jobs")
107 iJob = self._createJobInstance()
108 sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
109 sqlCmd += " VALUES ("
110 sqlCmd += " \"%s\"," % ( iJob.jobid )
111 sqlCmd += " \"%s\"," % ( iJob.jobname )
112 sqlCmd += " \"%s\"," % ( iJob.groupid )
113 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
114 sqlCmd += " \"%s\"," % ( iJob.launcher )
115 sqlCmd += " \"%s\"," % ( iJob.queue )
116 sqlCmd += " \"waiting\","
117 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
118 sqlCmd += " \"?\" );"
119 self._db.execute( sqlCmd )
120 self._db.execute( sqlCmd )
121
122 expError = "expError.txt"
123 expErrorHandler = open(expError, "w")
124 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
125 expErrorHandler.close()
126
127 obsError = "obsError.txt"
128 obsErrorHandler = open(obsError, "w")
129 stderrRef = sys.stderr
130 sys.stderr = obsErrorHandler
131
132 isSysExitRaised = False
133 try:
134 self._iRepetJob.getJobStatus(iJob)
135 except SystemExit:
136 isSysExitRaised = True
137
138 obsErrorHandler.close()
139
140 self.assertTrue(isSysExitRaised)
141 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
142
143 sys.stderr = stderrRef
144 os.remove(obsError)
145 os.remove(expError)
146
147 self._db.dropTable(self._jobTableName)
148
149 def test_updateInfoTable(self):
150 self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo")
151
152 qryParams = "SELECT name, file FROM info_tables WHERE name=%s AND file=%s"
153 params = (self._jobTableName, "dummyInfo")
154
155 self._db.execute(qryParams, params)
156 tObs = self._db.fetchall()[0]
157 tExp = (self._jobTableName, "dummyInfo")
158
159 self.assertEquals(tExp, tObs)
160
161 self._db.dropTable(self._jobTableName)
162
163 def test_changeJobStatus(self):
164 expStatus = "finished"
165
166 self._iRepetJob.createTable(self._jobTableName, "jobs")
167 iJob = self._createJobInstance()
168 self._iRepetJob.recordJob(iJob)
169 self._iRepetJob.changeJobStatus(iJob, expStatus, "method")
170
171 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s"
172 params = (iJob.jobid, iJob.groupid, iJob.queue)
173 self._db.execute(qryParams, params)
174
175 obsStatus = self._db.fetchall()[0][0]
176 self.assertEquals(expStatus, obsStatus)
177 self._iRepetJob.removeJob(iJob)
178 self._db.dropTable(self._jobTableName)
179
180 def test_getCountStatus(self):
181 self._iRepetJob.createTable(self._jobTableName, "jobs")
182
183 iJob1 = self._createJobInstance()
184 iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
185
186 self._iRepetJob.recordJob(iJob1)
187 self._iRepetJob.recordJob(iJob2)
188
189 expCount = 2
190 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
191
192 self.assertEquals(expCount, obsCount)
193
194 def test_getCountStatus_without_res(self):
195 self._iRepetJob.createTable(self._jobTableName, "jobs")
196 expCount = 0
197
198 obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting")
199
200 self.assertEquals(expCount, obsCount)
201
202 def test_cleanJobGroup(self):
203 self._iRepetJob.createTable(self._jobTableName, "jobs")
204 iJob1 = self._createJobInstance()
205 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
206 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
207
208 self._iRepetJob.recordJob(iJob1)
209 self._iRepetJob.recordJob(iJob2)
210 self._iRepetJob.recordJob(iJob3)
211
212 self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid)
213
214 qryParams = "SELECT count(*) FROM " + self._jobTableName
215
216 self._db.execute(qryParams)
217
218 expCount = 1
219 obsCount = self._db.fetchall()[0][0]
220
221 self.assertEquals(expCount, obsCount)
222
223 self._iRepetJob.removeJob(iJob3)
224 self._db.dropTable(self._jobTableName)
225
226 def test_hasUnfinishedJob(self):
227 self._iRepetJob.createTable(self._jobTableName, "jobs")
228 iJob1 = self._createJobInstance()
229 iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
230 iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
231
232 self._iRepetJob.recordJob(iJob1)
233 self._iRepetJob.recordJob(iJob2)
234 self._iRepetJob.recordJob(iJob3)
235
236 self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
237
238 expHasGrpIdFinished = True
239 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
240
241 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
242
243 self._iRepetJob.removeJob(iJob1)
244 self._iRepetJob.removeJob(iJob2)
245 self._iRepetJob.removeJob(iJob3)
246 self._db.dropTable(self._jobTableName)
247
248 def test_hasUnfinishedJob_JobTableNotExists(self):
249 iJob1 = self._createJobInstance()
250
251 expHasGrpIdFinished = False
252 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
253
254 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
255
256 def test_hasUnfinishedJob_AllJobsFinished(self):
257 self._iRepetJob.createTable(self._jobTableName, "jobs")
258 iJob1 = self._createJobInstance()
259 iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
260 iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
261
262 self._iRepetJob.recordJob(iJob1)
263 self._iRepetJob.recordJob(iJob2)
264 self._iRepetJob.recordJob(iJob3)
265
266 self._iRepetJob.changeJobStatus(iJob1, "finished", "method")
267 self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
268
269 expHasGrpIdFinished = False
270 obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
271
272 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
273
274 self._iRepetJob.removeJob(iJob1)
275 self._iRepetJob.removeJob(iJob2)
276 self._iRepetJob.removeJob(iJob3)
277 self._db.dropTable(self._jobTableName)
278
279 def test_waitJobGroup_with_finished_job(self):
280 self._iRepetJob.createTable(self._jobTableName, "jobs")
281 iJob = self._createJobInstance()
282 self._iRepetJob.recordJob(iJob)
283 self._iRepetJob.changeJobStatus(iJob, "finished", "method")
284
285 Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0)
286 Exp = None
287
288 self.assertEquals(Exp, Obs)
289 self._iRepetJob.removeJob(iJob)
290
291 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
292 Obs = False
293 self._iRepetJob.createTable(self._jobTableName, "jobs")
294 iJob = self._createJobInstance()
295 self._iRepetJob.recordJob(iJob)
296 self._iRepetJob.changeJobStatus(iJob, "error", "method")
297
298 try:
299 self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0)
300 except SystemExit:
301 Obs = True
302
303 self.assertTrue(Obs)
304 self._iRepetJob.removeJob(iJob)
305
306 def test_setJobIdFromSge(self):
307 self._iRepetJob.createTable(self._jobTableName, "jobs")
308 iJob = self._createJobInstance()
309 self._iRepetJob.recordJob(iJob)
310 self._iRepetJob.setJobIdFromSge(iJob, 1000)
311
312 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s"
313 params = (iJob.jobname, iJob.queue, iJob.groupid)
314
315 self._db.execute(qryParams, params)
316
317 tObs = self._db.fetchall()[0]
318 tExp =(1000,)
319
320 self.assertEquals(tExp,tObs)
321
322 self._db.dropTable(self._jobTableName)
323
324 def test_submitJob_8_fields_for_job_table(self):
325 iJob = self._createJobInstance()
326 self._db.dropTable(self._jobTableName)
327 sqlCmd = "CREATE TABLE " + self._jobTableName
328 sqlCmd += " ( jobid INT UNSIGNED"
329 sqlCmd += ", groupid VARCHAR(255)"
330 sqlCmd += ", command TEXT"
331 sqlCmd += ", launcher VARCHAR(1024)"
332 sqlCmd += ", queue VARCHAR(255)"
333 sqlCmd += ", status VARCHAR(255)"
334 sqlCmd += ", time DATETIME"
335 sqlCmd += ", node VARCHAR(255) )"
336 self._db.execute(sqlCmd)
337
338 self._iRepetJob.submitJob(iJob)
339
340 expFieldsNb = 9
341 obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName))
342
343 self.assertEquals(expFieldsNb, obsFieldsNb)
344
345 self._db.dropTable(self._jobTableName)
346
347 def test_getNodesListByGroupId(self):
348 self._iRepetJob.createTable(self._jobTableName, "jobs")
349 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
350 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
351 iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
352
353 self._insertJob(iJob1)
354 self._insertJob(iJob2)
355 self._insertJob(iJob3)
356
357 expNodeList = ["node1", "node2"]
358 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid")
359 self.assertEquals(expNodeList, obsNodeList)
360
361 self._db.dropTable(self._jobTableName)
362
363 def test_getNodesListByGroupId_empty_list(self):
364 self._iRepetJob.createTable(self._jobTableName, "jobs")
365 iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
366 iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
367 iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
368
369 self._insertJob(iJob1)
370 self._insertJob(iJob2)
371 self._insertJob(iJob3)
372
373 expNodeList = []
374 obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3")
375 self.assertEquals(expNodeList, obsNodeList)
376
377 self._db.dropTable(self._jobTableName)
378
379 def _insertJob(self, iJob):
380 self._iRepetJob.removeJob( iJob )
381 sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
382 sqlCmd += " VALUES ("
383 sqlCmd += " \"%s\"," % ( iJob.jobid )
384 sqlCmd += " \"%s\"," % ( iJob.jobname )
385 sqlCmd += " \"%s\"," % ( iJob.groupid )
386 sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
387 sqlCmd += " \"%s\"," % ( iJob.launcher )
388 sqlCmd += " \"%s\"," % ( iJob.queue )
389 sqlCmd += " \"waiting\","
390 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
391 sqlCmd += " \"%s\" );" % ( iJob.node )
392 self._iRepetJob.execute( sqlCmd )
393
394 if __name__ == "__main__":
395 unittest.main()