Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/test/Test_TableJobAdaptator.py @ 16:6135c3075bc5
Deleted selected files
author | m-zytnicki |
---|---|
date | Mon, 22 Apr 2013 11:09:41 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
import unittest import sys import os import time #import stat #import threading from commons.core.sql.DbMySql import DbMySql #from commons.core.sql.DbSQLite import DbSQLite from commons.core.sql.Job import Job from commons.core.utils.FileUtils import FileUtils from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory #class Test_TableJobAdaptator_SQLite( unittest.TestCase ): # # def setUp(self): # self._jobTableName = "dummyJobTable" # self._dbName = "test.db" # self._db = DbSQLite(self._dbName) # self._iTJA = TableJobAdaptator(self._db, self._jobTableName) # if not self._db.doesTableExist(self._jobTableName): # self._db.createJobTable(self._jobTableName) # self._iJob = self._createJobInstance() # # def tearDown(self): # self._iTJA = None # self._db.close() ## self._db.delete() # ## def test_recordJob(self): ## self._iTJA.recordJob(self._iJob) ## qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = ?" ## params = (self._iJob.jobid,) ## self._db.execute(qryParams, params) ## tObs = self._db.fetchall()[0] ## tExp =(self._iJob.jobid, self._iJob.groupid, self._iJob.command, self._iJob.launcher, self._iJob.queue, "waiting", "?") ## self.assertEquals(tExp,tObs) ## ## def test_removeJob(self): ## self._iTJA.recordJob(self._iJob) ## self._iTJA.removeJob(self._iJob) ## self.assertTrue(self._db.isEmpty(self._jobTableName)) ## ## def test_getJobStatus(self): ## self._iTJA.recordJob(self._iJob) ## expStatus = "waiting" ## obsStatus = self._iTJA.getJobStatus(self._iJob) ## self.assertEquals(expStatus, obsStatus) ## ## def test_getJobStatus_no_job(self): ## expStatus = "unknown" ## obsStatus = self._iTJA.getJobStatus(self._iJob) ## self.assertEquals(expStatus, obsStatus) ## ## def test_getJobStatus_no_name(self): ## iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) ## expStatus = "unknown" ## obsStatus = self._iTJA.getJobStatus(iJob) ## self.assertEquals(expStatus, obsStatus) ## ## def test_getJobStatus_two_jobs(self): ## # Warning : this case will not append, because recordJob() begin by removeJob() ## sqlCmd = "INSERT INTO %s" % self._iJob.tablename ## sqlCmd += " VALUES (" ## sqlCmd += " \"%s\"," % self._iJob.jobid ## sqlCmd += " \"%s\"," % self._iJob.jobname ## sqlCmd += " \"%s\"," % self._iJob.groupid ## sqlCmd += " \"%s\"," % self._iJob.command.replace("\"","\'") ## sqlCmd += " \"%s\"," % self._iJob.launcher ## sqlCmd += " \"%s\"," % self._iJob.queue ## sqlCmd += " \"waiting\"," ## sqlCmd += " \"%s\"," % time.strftime( "%Y-%m-%d %H:%M:%S" ) ## sqlCmd += " \"?\" );" ## self._db.execute(sqlCmd) ## self._db.execute(sqlCmd) ## ## expError = "expError.txt" ## expErrorHandler = open(expError, "w") ## expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") ## expErrorHandler.close() ## obsError = "obsError.txt" ## obsErrorHandler = open(obsError, "w") ## stderrRef = sys.stderr ## sys.stderr = obsErrorHandler ## ## isSysExitRaised = False ## try: ## self._iTJA.getJobStatus(self._iJob) ## except SystemExit: ## isSysExitRaised = True ## ## obsErrorHandler.close() ## ## self.assertTrue(isSysExitRaised) ## self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) ## sys.stderr = stderrRef ## os.remove(obsError) ## os.remove(expError) ## ## def test_changeJobStatus(self): ## expStatus = "finished" ## self._iTJA.recordJob(self._iJob) ## self._iTJA.changeJobStatus(self._iJob, expStatus) ## qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =? AND groupid=? AND queue=?" ## params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) ## self._db.execute(qryParams, params) ## obsStatus = self._db.fetchall()[0][0] ## self.assertEquals(expStatus, obsStatus) ## self._iTJA.removeJob(self._iJob) ## ## def test_getCountStatus(self): ## iJob1 = self._createJobInstance() ## iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") ## self._iTJA.recordJob(iJob1) ## self._iTJA.recordJob(iJob2) ## expCount = 2 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") ## self.assertEquals(expCount, obsCount) ## ## def test_getCountStatus_without_res(self): ## expCount = 0 ## obsCount = self._iTJA.getCountStatus(self._jobTableName, "groupid", "waiting") ## self.assertEquals(expCount, obsCount) ## ## def test_cleanJobGroup(self): ## iJob1 = self._createJobInstance() ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") ## self._iTJA.recordJob(iJob1) ## self._iTJA.recordJob(iJob2) ## self._iTJA.recordJob(iJob3) ## self._iTJA.cleanJobGroup(self._jobTableName, iJob1.groupid) ## qryParams = "SELECT count(*) FROM " + self._jobTableName ## self._db.execute(qryParams) ## expCount = 1 ## obsCount = self._db.fetchall()[0][0] ## self.assertEquals(expCount, obsCount) ## ## def test_hasUnfinishedJob_one_waiting_one_finished(self): ## iJob1 = self._createJobInstance() ## iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") ## iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") ## self._iTJA.recordJob(iJob1) ## self._iTJA.recordJob(iJob2) ## self._iTJA.recordJob(iJob3) ## self._iTJA.changeJobStatus(iJob2, "finished") ## expHasGrpIdFinished = True ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) ## ## def test_hasUnfinishedJob_jobTable_doesnt_exist(self): ## self._db.dropTable(self._jobTableName) ## expHasGrpIdFinished = False ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, self._iJob.groupid) ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) ## ## def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): ## iJob1 = self._createJobInstance() ## iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") ## iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") ## self._iTJA.recordJob(iJob1) ## self._iTJA.recordJob(iJob2) ## self._iTJA.recordJob(iJob3) ## self._iTJA.changeJobStatus(iJob1, "finished") ## self._iTJA.changeJobStatus(iJob2, "finished") ## expHasGrpIdFinished = False ## obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(self._jobTableName, iJob1.groupid) ## self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) ## ## def test_waitJobGroup_with_finished_job(self): ## obs = False ## self._iTJA.recordJob(self._iJob) ## self._iTJA.changeJobStatus(self._iJob, "finished") ## try: ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) ## except SystemExit: ## obs = True ## self.assertFalse(obs) ## ## def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): ## obs = False ## self._iTJA.recordJob(self._iJob) ## self._iTJA.changeJobStatus(self._iJob, "error") ## try: ## self._iTJA.waitJobGroup(self._jobTableName ,self._iJob.groupid, 0, 0) ## except SystemExit: ## obs = True ## self.assertTrue(obs) ## ## def test_setJobIdFromSge(self): ## self._iTJA.recordJob(self._iJob) ## self._iTJA.setJobIdFromSge(self._iJob, 1000) ## qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = ? AND queue = ? AND groupid = ?" ## params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) ## self._db.execute(qryParams, params) ## tObs = self._db.fetchall()[0] ## tExp =(1000,) ## self.assertEquals(tExp,tObs) ## ## def test_submitJob_8_fields_for_job_table(self): ## self._db.dropTable(self._jobTableName) ## sqlCmd = "CREATE TABLE " + self._jobTableName ## sqlCmd += " ( jobid INT UNSIGNED" ## sqlCmd += ", groupid VARCHAR(255)" ## sqlCmd += ", command TEXT" ## sqlCmd += ", launcher VARCHAR(1024)" ## sqlCmd += ", queue VARCHAR(255)" ## sqlCmd += ", status VARCHAR(255)" ## sqlCmd += ", time DATETIME" ## sqlCmd += ", node VARCHAR(255) )" ## self._db.execute(sqlCmd) ## self._iTJA.submitJob(self._iJob) ## expFieldsNb = 9 ## obsFieldsNb = len(self._db.getFieldList(self._jobTableName)) ## self.assertEquals(expFieldsNb, obsFieldsNb) ## os.remove("jobid.stdout") ## ## def test_getNodesListByGroupId(self): ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) ## self._insertJob(iJob1) ## self._insertJob(iJob2) ## self._insertJob(iJob3) ## expNodeList = ["node1", "node2"] ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid") ## self.assertEquals(expNodeList, obsNodeList) ## ## def test_getNodesListByGroupId_empty_list(self): ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) ## iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) ## iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) ## self._insertJob(iJob1) ## self._insertJob(iJob2) ## self._insertJob(iJob3) ## expNodeList = [] ## obsNodeList = self._iTJA.getNodesListByGroupId(self._jobTableName, "groupid3") ## self.assertEquals(expNodeList, obsNodeList) ## ## def test_commitJob(self): ## iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) ## self._insertJob(iJob1) ## ## expJobStatus = "waiting" ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) ## self.assertEquals(expJobStatus, obsJobStatus) ## expJobStatus = "waiting" ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) ## self.assertEquals(expJobStatus, obsJobStatus) ## self._db.close() ## ## self._db = DbSQLite(self._dbName) ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) ## expJobStatus = "waiting" ## obsJobStatus = self._iTJA.getJobStatus(self._iJob) ## self.assertEquals(expJobStatus, obsJobStatus) ## ## def _insertJob(self, iJob): ## self._iTJA = TableJobAdaptator(self._db, self._jobTableName) ## self._iTJA.removeJob( iJob ) ## sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) ## sqlCmd += " VALUES (" ## sqlCmd += " \"%s\"," % ( iJob.jobid ) ## sqlCmd += " \"%s\"," % ( iJob.jobname ) ## sqlCmd += " \"%s\"," % ( iJob.groupid ) ## sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) ## sqlCmd += " \"%s\"," % ( iJob.launcher ) ## sqlCmd += " \"%s\"," % ( iJob.queue ) ## sqlCmd += " \"waiting\"," ## sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) ## sqlCmd += " \"%s\" );" % ( iJob.node ) ## self._db.execute( sqlCmd ) # ## def testRecordJob_in_parallel_with_2_thread(self) : ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") ## ## db1 = DbSQLite('threadJobTable.db') ## db1.createJobTable(self._jobTableName) ## ## db2 = DbSQLite(self._dbName) ## ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) ## iTJA2 = TableJobAdaptator(db2, self._jobTableName) ## ## iRJT1 = RecordJobThread(iTJA1, job1) ## iRJT2 = RecordJobThread(iTJA2, job2) ## iRJT1.start() ## iRJT2.start() ## ## while iRJT1.isAlive() or iRJT2.isAlive(): ## time.sleep(5) ## ## expJobStatus = "waiting" ## obsJobStatus1 = iTJA1.getJobStatus(job1) ## obsJobStatus2 = iTJA2.getJobStatus(job2) ## ## self.assertEquals(expJobStatus, obsJobStatus1) ## self.assertEquals(expJobStatus, obsJobStatus2) ## db1.db.close() ## db1.delete() ## # # def test_ThreadRecordJob_sqlite3_connection_object_different_instances(self): # ## for i in range(1, 11): ## job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) ## db1 = DbSQLite(self._dbName) ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) ## iRJT1 = RecordJobThread(iTJA1, job) # # #self._db.createJobTable(self._jobTableName) # # for i in range(1, 30) : # job = "job%s"% i # db = "db%s"%i # job = Job(self._jobTableName, 0, "job%s"% i, "test_Thread", "", "date;sleep 5;date", "./launcherFileTest_job%s.py" % i) # db = DbSQLite(self._dbName) # if i == 1 : # db.createJobTable(self._jobTableName) # iTJA = TableJobAdaptator(db, self._jobTableName) # iRJT = RecordJobThread(iTJA, job) # iRJT.start() # # #while iRJT.isAlive() : # #time.sleep(1) # ## job1 = Job(self._jobTableName, 0, "job1", "test", "", "date;sleep 5;date", "./launcherFileTest_job1.py") ## self._createLauncherFile(job1) ## job2 = Job(self._jobTableName, 0, "job2", "test", "", "date;sleep 5;date", "./launcherFileTest_job2.py") ## self._createLauncherFile(job2) ## ## db1 = DbSQLite(self._dbName) ## db2 = DbSQLite(self._dbName) ## ## iTJA1 = TableJobAdaptator(db1, self._jobTableName) ## iTJA2 = TableJobAdaptator(db2, self._jobTableName) ## ## ## iRJT1 = RecordJobThread(iTJA1, job1) ## iRJT2 = RecordJobThread(iTJA2, job2) ## ## iRJT1.start() ## iRJT2.start() ## ## while iRJT1.isAlive() or iRJT2.isAlive(): ## time.sleep(5) # # ## self.assertNotEquals(iRJT1._iTableJobAdaptator._iDb.db, iRJT2._iTableJobAdaptator._iDb.db) # # # def _createLauncherFile(self, iJob): # jobFileHandler = open(iJob.launcher , "w") ## self.cdir ## self.job # cDir = os.getcwd() # # launcher = "#!/usr/bin/python\n" # launcher += "import os\n" # launcher += "import sys\n" # # launcher += "print \"system:\", os.uname()\n" # launcher += "sys.stdout.flush()\n" # # newStatus = "running" # launcher += "from commons.core.sql.Job import Job\n" # launcher += "from commons.core.sql.DbSQLite import DbSQLite\n" # launcher += "from commons.core.sql.TableJobAdaptator import TableJobAdaptator\n" # launcher += "iJob = Job('%s', %s, '%s', '%s')\n" % (iJob.tablename, iJob.jobid, iJob.jobname, iJob.groupid) # launcher += "iDb = DbSQLite('%s/%s')\n" % (cDir, self._dbName) # launcher += "iTJA = TableJobAdaptator(iDb, '%s')\n" % self._jobTableName # launcher += "if not iDb.doesTableExist('%s'):\n" % (iJob.tablename) # launcher += "\tiDb.createJobTable('%s')\n" % self._jobTableName # # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus # # launcher += "print \"LAUNCH: " + iJob.command + "\"\n" # launcher += "sys.stdout.flush()\n" # launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" # launcher += "if exitStatus != 0:\n" # launcher += "\tprint \"ERROR: " + iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" # # newStatus = "finished" # launcher += "iTJA.changeJobStatus(iJob, '%s')\n" % newStatus # launcher += "iDb.close()\n" # # launcher += "sys.exit(0)\n" # jobFileHandler.write(launcher) # jobFileHandler.close() # os.chmod(iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) # # def _createJobInstance(self): # return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) class Test_TableJobAdaptator_MySQL( unittest.TestCase ): def setUp(self): self._jobTableName = "dummyJobTable" self._db = DbMySql() self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) self._db.createTable(self._jobTableName, "jobs", overwrite = True) self._iJob = self._createJobInstance() def tearDown(self): self._db.dropTable(self._jobTableName) self._iTJA = None self._db.close() def test_recordJob(self): self._iTJA.recordJob(self._iJob) qryParams = "SELECT jobid, jobname, groupid, launcher, queue, resources, status, node FROM " + self._jobTableName + " WHERE jobid = %s" params = (self._iJob.jobid) self._db.execute(qryParams, params) tObs = self._db.fetchall()[0] tExp =(self._iJob.jobid, self._iJob.jobname, self._iJob.groupid, self._iJob.launcher, self._iJob.queue, "['mem_free=10M']", "waiting", "?") self.assertEquals(tExp,tObs) def test_removeJob(self): self._iTJA.recordJob(self._iJob) self._iTJA.removeJob(self._iJob) isTableEmpty = self._db.isEmpty(self._jobTableName) self.assertTrue(isTableEmpty) def test_getJobStatus(self): self._iTJA.recordJob(self._iJob) expStatus = "waiting" obsStatus = self._iTJA.getJobStatus(self._iJob) self.assertEquals(expStatus, obsStatus) def test_getJobStatus_no_job(self): expStatus = "unknown" obsStatus = self._iTJA.getJobStatus(self._iJob) self.assertEquals(expStatus, obsStatus) def test_getJobStatus_no_name(self): iJob = Job(self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources") expStatus = "unknown" obsStatus = self._iTJA.getJobStatus(iJob) self.assertEquals(expStatus, obsStatus) def test_getJobStatus_two_jobs(self): # Warning : this case will not append, because recordJob() begin by removeJob() sqlCmd = "INSERT INTO %s" % self._jobTableName sqlCmd += " VALUES (" sqlCmd += " \"%s\"," % self._iJob.jobid sqlCmd += " \"%s\"," % self._iJob.jobname sqlCmd += " \"%s\"," % self._iJob.groupid sqlCmd += " \"%s\"," % self._iJob.launcher sqlCmd += " \"%s\"," % self._iJob.queue sqlCmd += " \"%s\"," % self._iJob.lResources sqlCmd += " \"waiting\"," sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") sqlCmd += " \"?\" );" self._db.execute(sqlCmd) self._db.execute(sqlCmd) expError = "expError.txt" expErrorHandler = open(expError, "w") expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") expErrorHandler.close() obsError = "obsError.txt" obsErrorHandler = open(obsError, "w") stderrRef = sys.stderr sys.stderr = obsErrorHandler isSysExitRaised = False try: self._iTJA.getJobStatus(self._iJob) except SystemExit: isSysExitRaised = True obsErrorHandler.close() self.assertTrue(isSysExitRaised) self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) sys.stderr = stderrRef os.remove(obsError) os.remove(expError) def test_changeJobStatus(self): expStatus = "finished" self._iTJA.recordJob(self._iJob) self._iTJA.changeJobStatus(self._iJob, expStatus) qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" params = (self._iJob.jobid, self._iJob.groupid, self._iJob.queue) self._db.execute(qryParams, params) obsStatus = self._db.fetchall()[0][0] self.assertEquals(expStatus, obsStatus) def test_getCountStatus(self): iJob1 = self._createJobInstance() iJob2 = Job(1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iTJA.recordJob(iJob1) self._iTJA.recordJob(iJob2) expCount = 2 obsCount = self._iTJA.getCountStatus(iJob1.groupid, "waiting") self.assertEquals(expCount, obsCount) def test_getCountStatus_without_res(self): expCount = 0 obsCount = self._iTJA.getCountStatus("groupid", "waiting") self.assertEquals(expCount, obsCount) def test_cleanJobGroup(self): iJob1 = self._createJobInstance() iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iTJA.recordJob(iJob1) self._iTJA.recordJob(iJob2) self._iTJA.recordJob(iJob3) self._iTJA.cleanJobGroup(iJob1.groupid) qryParams = "SELECT count(*) FROM %s" % self._jobTableName self._db.execute(qryParams) expCount = 1 obsCount = self._db.fetchall()[0][0] self.assertEquals(expCount, obsCount) def test_hasUnfinishedJob_one_waiting_one_finished(self): iJob1 = self._createJobInstance() iJob2 = Job(0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iTJA.recordJob(iJob1) self._iTJA.recordJob(iJob2) self._iTJA.recordJob(iJob3) self._iTJA.changeJobStatus(iJob2, "finished") expHasGrpIdFinished = True obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) def test_hasUnfinishedJob_all_jobs_finished_for_same_groupid(self): iJob1 = self._createJobInstance() iJob2 = Job(2, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") iJob3 = Job(3, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") self._iTJA.recordJob(iJob1) self._iTJA.recordJob(iJob2) self._iTJA.recordJob(iJob3) self._iTJA.changeJobStatus(iJob1, "finished") self._iTJA.changeJobStatus(iJob2, "finished") expHasGrpIdFinished = False obsHasGrpIdFinished = self._iTJA.hasUnfinishedJob(iJob1.groupid) self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) def test_waitJobGroup_with_finished_job(self): obs = False self._iTJA.recordJob(self._iJob) self._iTJA.changeJobStatus(self._iJob, "finished") try: self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) except SystemExit: obs = True self.assertFalse(obs) def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): obs = False self._iTJA.recordJob(self._iJob) self._iTJA.changeJobStatus(self._iJob, "error") try: self._iTJA.waitJobGroup(self._iJob.groupid, 0, 0) except SystemExit: obs = True self.assertTrue(obs) #TODO: how to test ?!? # def test_waitJobGroup_with_error_relaunch(self): # iJob = Job(0, "job1", "groupid", "queue.q", "command", "launcherFile", "node", ["mem_free=10M", "test=TRUE"]) # obs = False # self._iTJA.recordJob(iJob) # self._iTJA.changeJobStatus(iJob, "error") # try: # self._iTJA.waitJobGroup(iJob.groupid) # except SystemExit: # obs = True # self.assertTrue(obs) def test_updateJobIdInDB(self): self._iTJA.recordJob(self._iJob) self._iTJA.updateJobIdInDB(self._iJob, 1000) qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" params = (self._iJob.jobname, self._iJob.queue, self._iJob.groupid) self._db.execute(qryParams, params) tObs = self._db.fetchall()[0] tExp =(1000,) self.assertEquals(tExp,tObs) def test_getNodesListByGroupId(self): iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") iJob3 = Job(2, "job3", "groupid", "queue", "command", "launcherFile", "node2", "lResources") iJob4 = Job(3, "job4", "groupid2", "queue", "command", "launcherFile", "node3", "lResources") self._insertJob(iJob1) self._insertJob(iJob2) self._insertJob(iJob3) self._insertJob(iJob4) expNodeList = ["node1", "node2"] obsNodeList = self._iTJA.getNodesListByGroupId("groupid") self.assertEquals(expNodeList, obsNodeList) def test_getNodesListByGroupId_empty_list(self): iJob1 = Job(0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources") iJob2 = Job(1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources") iJob3 = Job(2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources") self._insertJob(iJob1) self._insertJob(iJob2) self._insertJob(iJob3) expNodeList = [] obsNodeList = self._iTJA.getNodesListByGroupId("groupid3") self.assertEquals(expNodeList, obsNodeList) # TODO test TableJobAdaptator._createJobInstance TableJobAdaptator._createLauncherFile def _insertJob(self, iJob): self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) self._iTJA.removeJob(iJob) sqlCmd = "INSERT INTO %s" % self._jobTableName sqlCmd += " VALUES (" sqlCmd += " \"%s\"," % iJob.jobid sqlCmd += " \"%s\"," % iJob.jobname sqlCmd += " \"%s\"," % iJob.groupid sqlCmd += " \"%s\"," % iJob.launcher sqlCmd += " \"%s\"," % iJob.queue sqlCmd += " \"%s\"," % iJob.lResources sqlCmd += " \"waiting\"," sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") sqlCmd += " \"%s\" );" % iJob.node self._db.execute(sqlCmd) def _createJobInstance(self): return Job(0, "job1", "groupid", "", "command", "launcherFile", "node", ["mem_free=10M"]) #class RecordJobThread(threading.Thread): # # def __init__(self, iTableJobAdaptator, iJob): # threading.Thread.__init__(self) # self._iTableJobAdaptator = iTableJobAdaptator # self._iJob = iJob # # def run(self): # self._iTableJobAdaptator.recordJob(self._iJob) # #self._iTableJobAdaptator.submitJob(self._iJob) if __name__ == "__main__": unittest.main()