comparison commons/core/sql/test/Test_TableJobAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 import unittest
2 import sys
3 import os
4 import time
5 #import stat
6 #import threading
7 from commons.core.sql.DbMySql import DbMySql
8 #from commons.core.sql.DbSQLite import DbSQLite
9 from commons.core.sql.Job import Job
10 from commons.core.utils.FileUtils import FileUtils
11 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
12
13 #class Test_TableJobAdaptator_SQLite( unittest.TestCase ):
14 #
15 # def setUp(self):
16 # self._jobTableName = "dummyJobTable"
17 # self._dbName = "test.db"
18 # self._db = DbSQLite(self._dbName)
19 # self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
20 # if not self._db.doesTableExist(self._jobTableName):
21 # self._db.createJobTable(self._jobTableName)
22 # self._iJob = self._createJobInstance()
23 #
24 # def tearDown(self):
25 # self._iTJA = None
26 # self._db.close()
27 ## self._db.delete()
28 #
29 ## def test_recordJob(self):
30 ## self._iTJA.recordJob(self._iJob)
31 ## qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = ?"
32 ## params = (self._iJob.jobid,)
33 ## self._db.execute(qryParams, params)
34 ## tObs = self._db.fetchall()[0]
35 ## tExp =(self._iJob.jobid, self._iJob.groupid, self._iJob.command, self._iJob.launcher, self._iJob.queue, "waiting", "?")
36 ## self.assertEquals(tExp,tObs)
37 ##
38 ## def test_removeJob(self):
39 ## self._iTJA.recordJob(self._iJob)
40 ## self._iTJA.removeJob(self._iJob)
41 ## self.assertTrue(self._db.isEmpty(self._jobTableName))
42 ##
43 ## def test_getJobStatus(self):
44 ## self._iTJA.recordJob(self._iJob)
45 ## expStatus = "waiting"
46 ## obsStatus = self._iTJA.getJobStatus(self._iJob)
47 ## self.assertEquals(expStatus, obsStatus)
48 ##
49 ## def test_getJobStatus_no_job(self):
50 ## expStatus = "unknown"
51 ## obsStatus = self._iTJA.getJobStatus(self._iJob)
52 ## self.assertEquals(expStatus, obsStatus)
53 ##
54 ## def test_getJobStatus_no_name(self):
55 ## iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
56 ## expStatus = "unknown"
57 ## obsStatus = self._iTJA.getJobStatus(iJob)
58 ## self.assertEquals(expStatus, obsStatus)
59 ##
60 ## def test_getJobStatus_two_jobs(self):
61 ## # Warning : this case will not append, because recordJob() begin by removeJob()
62 ## sqlCmd = "INSERT INTO %s" % self._iJob.tablename
63 ## sqlCmd += " VALUES ("
64 ## sqlCmd += " \"%s\"," % self._iJob.jobid
65 ## sqlCmd += " \"%s\"," % self._iJob.jobname
66 ## sqlCmd += " \"%s\"," % self._iJob.groupid
67 ## sqlCmd += " \"%s\"," % self._iJob.command.replace("\"","\'")
68 ## sqlCmd += " \"%s\"," % self._iJob.launcher
69 ## sqlCmd += " \"%s\"," % self._iJob.queue
70 ## sqlCmd += " \"waiting\","
71 ## sqlCmd += " \"%s\"," % time.strftime( "%Y-%m-%d %H:%M:%S" )
72 ## sqlCmd += " \"?\" );"
73 ## self._db.execute(sqlCmd)
74 ## self._db.execute(sqlCmd)
75 ##
76 ## expError = "expError.txt"
77 ## expErrorHandler = open(expError, "w")
78 ## expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
79 ## expErrorHandler.close()
80 ## obsError = "obsError.txt"
81 ## obsErrorHandler = open(obsError, "w")
82 ## stderrRef = sys.stderr
83 ## sys.stderr = obsErrorHandler
84 ##
85 ## isSysExitRaised = False
86 ## try:
87 ## self._iTJA.getJobStatus(self._iJob)
88 ## except SystemExit:
89 ## isSysExitRaised = True
90 ##
91 ## obsErrorHandler.close()
92 ##
93 ## self.assertTrue(isSysExitRaised)
94 ## self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
95 ## sys.stderr = stderrRef
96 ## os.remove(obsError)
97 ## os.remove(expError)
98 ##
99 ## def test_changeJobStatus(self):
100 ## expStatus = "finished"
101 ## self._iTJA.recordJob(self._iJob)
102 ## self._iTJA.changeJobStatus(self._iJob, expStatus)
103 ## qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =? AND groupid=? AND queue=?"
104 ## params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue)
105 ## self._db.execute(qryParams, params)
106 ## obsStatus = self._db.fetchall()[0][0]
107 ## self.assertEquals(expStatus, obsStatus)
108 ## self._iTJA.removeJob(self._iJob)
109 ##
110 ## def test_getCountStatus(self):
111 ## iJob1 = self._createJobInstance()
112 ## iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
113 ## self._iTJA.recordJob(iJob1)
114 ## self._iTJA.recordJob(iJob2)
115 ## expCount = 2
116 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
117 ## self.assertEquals(expCount, obsCount)
118 ##
119 ## def test_getCountStatus_without_res(self):
120 ## expCount = 0
121 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, "groupid", "waiting")
122 ## self.assertEquals(expCount, obsCount)
123 ##
124 ## def test_cleanJobGroup(self):
125 ## iJob1 = self._createJobInstance()
126 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
127 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
128 ## self._iTJA.recordJob(iJob1)
129 ## self._iTJA.recordJob(iJob2)
130 ## self._iTJA.recordJob(iJob3)
131 ## self._iTJA.cleanJobGroup(self._jobTableName, iJob1.groupid)
132 ## qryParams = "SELECT count(*) FROM " + self._jobTableName
133 ## self._db.execute(qryParams)
134 ## expCount = 1
135 ## obsCount = self._db.fetchall()[0][0]
136 ## self.assertEquals(expCount, obsCount)
137 ##
138 ## def test_hasUnfinishedJob_one_waiting_one_finished(self):
139 ## iJob1 = self._createJobInstance()
140 ## iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
141 ## iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
142 ## self._iTJA.recordJob(iJob1)
143 ## self._iTJA.recordJob(iJob2)
144 ## self._iTJA.recordJob(iJob3)
145 ## self._iTJA.changeJobStatus(iJob2, "finished")
146 ## expHasGrpIdFinished = True
147 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
148 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
149 ##
150 ## def test_hasUnfinishedJob_jobTable_doesnt_exist(self):
151 ## self._db.dropTable(self._jobTableName)
152 ## expHasGrpIdFinished = False
153 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, self._iJob.groupid)
154 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
155 ##
156 ## def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self):
157 ## iJob1 = self._createJobInstance()
158 ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
159 ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
160 ## self._iTJA.recordJob(iJob1)
161 ## self._iTJA.recordJob(iJob2)
162 ## self._iTJA.recordJob(iJob3)
163 ## self._iTJA.changeJobStatus(iJob1, "finished")
164 ## self._iTJA.changeJobStatus(iJob2, "finished")
165 ## expHasGrpIdFinished = False
166 ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
167 ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
168 ##
169 ## def test_waitJobGroup_with_finished_job(self):
170 ## obs = False
171 ## self._iTJA.recordJob(self._iJob)
172 ## self._iTJA.changeJobStatus(self._iJob, "finished")
173 ## try:
174 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0)
175 ## except SystemExit:
176 ## obs = True
177 ## self.assertFalse(obs)
178 ##
179 ## def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
180 ## obs = False
181 ## self._iTJA.recordJob(self._iJob)
182 ## self._iTJA.changeJobStatus(self._iJob, "error")
183 ## try:
184 ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0)
185 ## except SystemExit:
186 ## obs = True
187 ## self.assertTrue(obs)
188 ##
189 ## def test_setJobIdFromSge(self):
190 ## self._iTJA.recordJob(self._iJob)
191 ## self._iTJA.setJobIdFromSge(self._iJob, 1000)
192 ## qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = ? AND queue = ? AND groupid = ?"
193 ## params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid)
194 ## self._db.execute(qryParams, params)
195 ## tObs = self._db.fetchall()[0]
196 ## tExp =(1000,)
197 ## self.assertEquals(tExp,tObs)
198 ##
199 ## def test_submitJob_8_fields_for_job_table(self):
200 ## self._db.dropTable(self._jobTableName)
201 ## sqlCmd = "CREATE TABLE " + self._jobTableName
202 ## sqlCmd += " ( jobid INT UNSIGNED"
203 ## sqlCmd += ", groupid VARCHAR(255)"
204 ## sqlCmd += ", command TEXT"
205 ## sqlCmd += ", launcher VARCHAR(1024)"
206 ## sqlCmd += ", queue VARCHAR(255)"
207 ## sqlCmd += ", status VARCHAR(255)"
208 ## sqlCmd += ", time DATETIME"
209 ## sqlCmd += ", node VARCHAR(255) )"
210 ## self._db.execute(sqlCmd)
211 ## self._iTJA.submitJob(self._iJob)
212 ## expFieldsNb = 9
213 ## obsFieldsNb = len(self._db.getFieldList(self._jobTableName))
214 ## self.assertEquals(expFieldsNb, obsFieldsNb)
215 ## os.remove("jobid.stdout")
216 ##
217 ## def test_getNodesListByGroupId(self):
218 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
219 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
220 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
221 ## self._insertJob(iJob1)
222 ## self._insertJob(iJob2)
223 ## self._insertJob(iJob3)
224 ## expNodeList = ["node1", "node2"]
225 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid")
226 ## self.assertEquals(expNodeList, obsNodeList)
227 ##
228 ## def test_getNodesListByGroupId_empty_list(self):
229 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
230 ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
231 ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
232 ## self._insertJob(iJob1)
233 ## self._insertJob(iJob2)
234 ## self._insertJob(iJob3)
235 ## expNodeList = []
236 ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid3")
237 ## self.assertEquals(expNodeList, obsNodeList)
238 ##
239 ## def test_commitJob(self):
240 ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
241 ## self._insertJob(iJob1)
242 ##
243 ## expJobStatus = "waiting"
244 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
245 ## self.assertEquals(expJobStatus, obsJobStatus)
246 ## expJobStatus = "waiting"
247 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
248 ## self.assertEquals(expJobStatus, obsJobStatus)
249 ## self._db.close()
250 ##
251 ## self._db = DbSQLite(self._dbName)
252 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
253 ## expJobStatus = "waiting"
254 ## obsJobStatus = self._iTJA.getJobStatus(self._iJob)
255 ## self.assertEquals(expJobStatus, obsJobStatus)
256 ##
257 ## def _insertJob(self, iJob):
258 ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName)
259 ## self._iTJA.removeJob( iJob )
260 ## sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
261 ## sqlCmd += " VALUES ("
262 ## sqlCmd += " \"%s\"," % ( iJob.jobid )
263 ## sqlCmd += " \"%s\"," % ( iJob.jobname )
264 ## sqlCmd += " \"%s\"," % ( iJob.groupid )
265 ## sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
266 ## sqlCmd += " \"%s\"," % ( iJob.launcher )
267 ## sqlCmd += " \"%s\"," % ( iJob.queue )
268 ## sqlCmd += " \"waiting\","
269 ## sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
270 ## sqlCmd += " \"%s\" );" % ( iJob.node )
271 ## self._db.execute( sqlCmd )
272 #
273 ## def testRecordJob_in_parallel_with_2_thread(self) :
274 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py")
275 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py")
276 ##
277 ## db1 = DbSQLite('threadJobTable.db')
278 ## db1.createJobTable(self._jobTableName)
279 ##
280 ## db2 = DbSQLite(self._dbName)
281 ##
282 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
283 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName)
284 ##
285 ## iRJT1 = RecordJobThread(iTJA1, job1)
286 ## iRJT2 = RecordJobThread(iTJA2, job2)
287 ## iRJT1.start()
288 ## iRJT2.start()
289 ##
290 ## while iRJT1.isAlive() or iRJT2.isAlive():
291 ## time.sleep(5)
292 ##
293 ## expJobStatus = "waiting"
294 ## obsJobStatus1 = iTJA1.getJobStatus(job1)
295 ## obsJobStatus2 = iTJA2.getJobStatus(job2)
296 ##
297 ## self.assertEquals(expJobStatus, obsJobStatus1)
298 ## self.assertEquals(expJobStatus, obsJobStatus2)
299 ## db1.db.close()
300 ## db1.delete()
301 ##
302 #
303 # def test_ThreadRecordJob_sqlite3_connection_object_different_instances(self):
304 #
305 ## for i in range(1, 11):
306 ## job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i)
307 ## db1 = DbSQLite(self._dbName)
308 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
309 ## iRJT1 = RecordJobThread(iTJA1, job)
310 #
311 # #self._db.createJobTable(self._jobTableName)
312 #
313 # for i in range(1, 30) :
314 # job = "job%s"% i
315 # db = "db%s"%i
316 # job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i)
317 # db = DbSQLite(self._dbName)
318 # if i == 1 :
319 # db.createJobTable(self._jobTableName)
320 # iTJA = TableJobAdaptator(db, self._jobTableName)
321 # iRJT = RecordJobThread(iTJA, job)
322 # iRJT.start()
323 #
324 # #while iRJT.isAlive() :
325 # #time.sleep(1)
326 #
327 ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py")
328 ## self._createLauncherFile(job1)
329 ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py")
330 ## self._createLauncherFile(job2)
331 ##
332 ## db1 = DbSQLite(self._dbName)
333 ## db2 = DbSQLite(self._dbName)
334 ##
335 ## iTJA1 = TableJobAdaptator(db1, self._jobTableName)
336 ## iTJA2 = TableJobAdaptator(db2, self._jobTableName)
337 ##
338 ##
339 ## iRJT1 = RecordJobThread(iTJA1, job1)
340 ## iRJT2 = RecordJobThread(iTJA2, job2)
341 ##
342 ## iRJT1.start()
343 ## iRJT2.start()
344 ##
345 ## while iRJT1.isAlive() or iRJT2.isAlive():
346 ## time.sleep(5)
347 #
348 #
349 ## self.assertNotEquals(iRJT1._iTableJobAdaptator._iDb.db, iRJT2._iTableJobAdaptator._iDb.db)
350 #
351 #
352 # def _createLauncherFile(self, iJob):
353 # jobFileHandler = open(iJob.launcher , "w")
354 ## self.cdir
355 ## self.job
356 # cDir = os.getcwd()
357 #
358 # launcher = "#!/usr/bin/python\n"
359 # launcher += "import os\n"
360 # launcher += "import sys\n"
361 #
362 # launcher += "print \"system:\", os.uname()\n"
363 # launcher += "sys.stdout.flush()\n"
364 #
365 # newStatus = "running"
366 # launcher += "from commons.core.sql.Job import Job\n"
367 # launcher += "from commons.core.sql.DbSQLite import DbSQLite\n"
368 # launcher += "from commons.core.sql.TableJobAdaptator import TableJobAdaptator\n"
369 # launcher += "iJob = Job('%s', %s, '%s', '%s')\n" % (iJob.tablename, iJob.jobid, iJob.jobname, iJob.groupid)
370 # launcher += "iDb = DbSQLite('%s/%s')\n" % (cDir, self._dbName)
371 # launcher += "iTJA = TableJobAdaptator(iDb, '%s')\n" % self._jobTableName
372 # launcher += "if not iDb.doesTableExist('%s'):\n" % (iJob.tablename)
373 # launcher += "\tiDb.createJobTable('%s')\n" % self._jobTableName
374 #
375 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus
376 #
377 # launcher += "print \"LAUNCH: " + iJob.command + "\"\n"
378 # launcher += "sys.stdout.flush()\n"
379 # launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
380 # launcher += "if exitStatus != 0:\n"
381 # launcher += "\tprint \"ERROR: " + iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
382 #
383 # newStatus = "finished"
384 # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus
385 # launcher += "iDb.close()\n"
386 #
387 # launcher += "sys.exit(0)\n"
388 # jobFileHandler.write(launcher)
389 # jobFileHandler.close()
390 # os.chmod(iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
391 #
392 # def _createJobInstance(self):
393 # return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
394
395
396 class Test_TableJobAdaptator_MySQL( unittest.TestCase ):
397
398 def setUp(self):
399 self._jobTableName = "dummyJobTable"
400 self._db = DbMySql()
401 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
402 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
403 self._iJob = self._createJobInstance()
404
405 def tearDown(self):
406 self._db.dropTable(self._jobTableName)
407 self._iTJA = None
408 self._db.close()
409
410 def test_recordJob(self):
411 self._iTJA.recordJob(self._iJob)
412 qryParams = "SELECT jobid, jobname, groupid, launcher, queue, resources, status, node FROM " + self._jobTableName + " WHERE jobid = %s"
413 params = (self._iJob.jobid)
414 self._db.execute(qryParams, params)
415 tObs = self._db.fetchall()[0]
416 tExp =(self._iJob.jobid, self._iJob.jobname, self._iJob.groupid, self._iJob.launcher, self._iJob.queue, "['mem_free=10M']", "waiting", "?")
417 self.assertEquals(tExp,tObs)
418
419 def test_removeJob(self):
420 self._iTJA.recordJob(self._iJob)
421 self._iTJA.removeJob(self._iJob)
422 isTableEmpty = self._db.isEmpty(self._jobTableName)
423 self.assertTrue(isTableEmpty)
424
425 def test_getJobStatus(self):
426 self._iTJA.recordJob(self._iJob)
427 expStatus = "waiting"
428 obsStatus = self._iTJA.getJobStatus(self._iJob)
429 self.assertEquals(expStatus, obsStatus)
430
431 def test_getJobStatus_no_job(self):
432 expStatus = "unknown"
433 obsStatus = self._iTJA.getJobStatus(self._iJob)
434 self.assertEquals(expStatus, obsStatus)
435
436 def test_getJobStatus_no_name(self):
437 iJob = Job(self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources")
438 expStatus = "unknown"
439 obsStatus = self._iTJA.getJobStatus(iJob)
440 self.assertEquals(expStatus, obsStatus)
441
442 def test_getJobStatus_two_jobs(self):
443 # Warning : this case will not append, because recordJob() begin by removeJob()
444 sqlCmd = "INSERT INTO %s" % self._jobTableName
445 sqlCmd += " VALUES ("
446 sqlCmd += " \"%s\"," % self._iJob.jobid
447 sqlCmd += " \"%s\"," % self._iJob.jobname
448 sqlCmd += " \"%s\"," % self._iJob.groupid
449 sqlCmd += " \"%s\"," % self._iJob.launcher
450 sqlCmd += " \"%s\"," % self._iJob.queue
451 sqlCmd += " \"%s\"," % self._iJob.lResources
452 sqlCmd += " \"waiting\","
453 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
454 sqlCmd += " \"?\" );"
455 self._db.execute(sqlCmd)
456 self._db.execute(sqlCmd)
457
458 expError = "expError.txt"
459 expErrorHandler = open(expError, "w")
460 expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
461 expErrorHandler.close()
462 obsError = "obsError.txt"
463 obsErrorHandler = open(obsError, "w")
464 stderrRef = sys.stderr
465 sys.stderr = obsErrorHandler
466
467 isSysExitRaised = False
468 try:
469 self._iTJA.getJobStatus(self._iJob)
470 except SystemExit:
471 isSysExitRaised = True
472 obsErrorHandler.close()
473 self.assertTrue(isSysExitRaised)
474 self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
475 sys.stderr = stderrRef
476 os.remove(obsError)
477 os.remove(expError)
478
479 def test_changeJobStatus(self):
480 expStatus = "finished"
481 self._iTJA.recordJob(self._iJob)
482 self._iTJA.changeJobStatus(self._iJob, expStatus)
483 qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s"
484 params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue)
485 self._db.execute(qryParams, params)
486 obsStatus = self._db.fetchall()[0][0]
487 self.assertEquals(expStatus, obsStatus)
488
489 def test_getCountStatus(self):
490 iJob1 = self._createJobInstance()
491 iJob2 = Job(1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
492 self._iTJA.recordJob(iJob1)
493 self._iTJA.recordJob(iJob2)
494 expCount = 2
495 obsCount = self._iTJA.getCountStatus(iJob1.groupid, "waiting")
496 self.assertEquals(expCount, obsCount)
497
498 def test_getCountStatus_without_res(self):
499 expCount = 0
500 obsCount = self._iTJA.getCountStatus("groupid", "waiting")
501 self.assertEquals(expCount, obsCount)
502
503 def test_cleanJobGroup(self):
504 iJob1 = self._createJobInstance()
505 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
506 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
507 self._iTJA.recordJob(iJob1)
508 self._iTJA.recordJob(iJob2)
509 self._iTJA.recordJob(iJob3)
510 self._iTJA.cleanJobGroup(iJob1.groupid)
511 qryParams = "SELECT count(*) FROM %s" % self._jobTableName
512 self._db.execute(qryParams)
513 expCount = 1
514 obsCount = self._db.fetchall()[0][0]
515 self.assertEquals(expCount, obsCount)
516
517 def test_hasUnfinishedJob_one_waiting_one_finished(self):
518 iJob1 = self._createJobInstance()
519 iJob2 = Job(0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
520 iJob3 = Job(0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
521 self._iTJA.recordJob(iJob1)
522 self._iTJA.recordJob(iJob2)
523 self._iTJA.recordJob(iJob3)
524 self._iTJA.changeJobStatus(iJob2, "finished")
525 expHasGrpIdFinished = True
526 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid)
527 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
528
529 def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self):
530 iJob1 = self._createJobInstance()
531 iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
532 iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
533 self._iTJA.recordJob(iJob1)
534 self._iTJA.recordJob(iJob2)
535 self._iTJA.recordJob(iJob3)
536 self._iTJA.changeJobStatus(iJob1, "finished")
537 self._iTJA.changeJobStatus(iJob2, "finished")
538 expHasGrpIdFinished = False
539 obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid)
540 self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
541
542 def test_waitJobGroup_with_finished_job(self):
543 obs = False
544 self._iTJA.recordJob(self._iJob)
545 self._iTJA.changeJobStatus(self._iJob, "finished")
546 try:
547 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0)
548 except SystemExit:
549 obs = True
550 self.assertFalse(obs)
551
552 def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
553 obs = False
554 self._iTJA.recordJob(self._iJob)
555 self._iTJA.changeJobStatus(self._iJob, "error")
556 try:
557 self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0)
558 except SystemExit:
559 obs = True
560 self.assertTrue(obs)
561
562 #TODO: how to test ?!?
563 # def test_waitJobGroup_with_error_relaunch(self):
564 # iJob = Job(0, "job1", "groupid", "queue.q", "command", "launcherFile", "node", ["mem_free=10M", "test=TRUE"])
565 # obs = False
566 # self._iTJA.recordJob(iJob)
567 # self._iTJA.changeJobStatus(iJob, "error")
568 # try:
569 # self._iTJA.waitJobGroup(iJob.groupid)
570 # except SystemExit:
571 # obs = True
572 # self.assertTrue(obs)
573
574 def test_updateJobIdInDB(self):
575 self._iTJA.recordJob(self._iJob)
576 self._iTJA.updateJobIdInDB(self._iJob, 1000)
577 qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s"
578 params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid)
579 self._db.execute(qryParams, params)
580 tObs = self._db.fetchall()[0]
581 tExp =(1000,)
582 self.assertEquals(tExp,tObs)
583
584 def test_getNodesListByGroupId(self):
585 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources")
586 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
587 iJob3 = Job(2, "job3", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
588 iJob4 = Job(3, "job4", "groupid2", "queue", "command", "launcherFile", "node3", "lResources")
589 self._insertJob(iJob1)
590 self._insertJob(iJob2)
591 self._insertJob(iJob3)
592 self._insertJob(iJob4)
593 expNodeList = ["node1", "node2"]
594 obsNodeList = self._iTJA.getNodesListByGroupId("groupid")
595 self.assertEquals(expNodeList, obsNodeList)
596
597 def test_getNodesListByGroupId_empty_list(self):
598 iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources")
599 iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources")
600 iJob3 = Job(2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources")
601 self._insertJob(iJob1)
602 self._insertJob(iJob2)
603 self._insertJob(iJob3)
604 expNodeList = []
605 obsNodeList = self._iTJA.getNodesListByGroupId("groupid3")
606 self.assertEquals(expNodeList, obsNodeList)
607
608 # TODO test TableJobAdaptator._createJobInstance TableJobAdaptator._createLauncherFile
609 def _insertJob(self, iJob):
610 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
611 self._iTJA.removeJob(iJob)
612 sqlCmd = "INSERT INTO %s" % self._jobTableName
613 sqlCmd += " VALUES ("
614 sqlCmd += " \"%s\"," % iJob.jobid
615 sqlCmd += " \"%s\"," % iJob.jobname
616 sqlCmd += " \"%s\"," % iJob.groupid
617 sqlCmd += " \"%s\"," % iJob.launcher
618 sqlCmd += " \"%s\"," % iJob.queue
619 sqlCmd += " \"%s\"," % iJob.lResources
620 sqlCmd += " \"waiting\","
621 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S")
622 sqlCmd += " \"%s\" );" % iJob.node
623 self._db.execute(sqlCmd)
624
625 def _createJobInstance(self):
626 return Job(0, "job1", "groupid", "", "command", "launcherFile", "node", ["mem_free=10M"])
627
628 #class RecordJobThread(threading.Thread):
629 #
630 # def __init__(self, iTableJobAdaptator, iJob):
631 # threading.Thread.__init__(self)
632 # self._iTableJobAdaptator = iTableJobAdaptator
633 # self._iJob = iJob
634 #
635 # def run(self):
636 # self._iTableJobAdaptator.recordJob(self._iJob)
637 # #self._iTableJobAdaptator.submitJob(self._iJob)
638
639 if __name__ == "__main__":
640 unittest.main()