diff commons/core/sql/test/Tst_RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/test/Tst_RepetJob.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,395 @@
+import unittest
+import sys
+import os
+import time
+from commons.core.sql.DbMySql import DbMySql
+from commons.core.sql.Job import Job
+from commons.core.sql.RepetJob import RepetJob
+from commons.core.utils.FileUtils import FileUtils
+
+#TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
+class Test_RepetJob( unittest.TestCase ):
+    
+    def setUp(self):
+        self._jobTableName = "dummyJobTable"
+        self._db = DbMySql()
+        self._iRepetJob = RepetJob()
+    
+    def tearDown(self):
+        self._iRepetJob = None
+        self._db.close()
+        
+    def _createJobInstance(self):
+        return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" )
+    
+    def test_createJobTable_is_table_created(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+    
+        isTableCreated = self._db.doesTableExist(self._jobTableName)
+        self.assertTrue(isTableCreated)
+    
+        self._db.dropTable(self._jobTableName)
+    
+    def test_createJobTable_field_list(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+
+        obsLFiled = self._db.getFieldList(self._jobTableName)
+        expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"]
+    
+        self.assertEquals(expLField, obsLFiled)
+    
+        self._db.dropTable(self._jobTableName)
+    
+    def test_recordJob(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+    
+        qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s" 
+        params = (iJob.jobid)
+        
+        self._db.execute(qryParams, params)
+        
+        tObs = self._db.fetchall()[0]
+        tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?")
+        
+        self.assertEquals(tExp,tObs)
+
+        self._db.dropTable(self._jobTableName)
+    
+    def test_removeJob(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+
+        self._iRepetJob.removeJob(iJob)
+        
+        isTableEmpty = self._db.isEmpty(self._jobTableName)
+        
+        self.assertTrue(isTableEmpty)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def test_getJobStatus(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+
+        expStatus = "waiting"
+        obsStatus = self._iRepetJob.getJobStatus(iJob)
+        
+        self.assertEquals(expStatus, obsStatus)
+        self._db.dropTable(self._jobTableName)
+    
+    def test_getJobStatus_unknown(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()        
+
+        expStatus = "unknown"
+        obsStatus = self._iRepetJob.getJobStatus(iJob)
+        
+        self.assertEquals(expStatus, obsStatus)
+        self._db.dropTable(self._jobTableName)
+    
+    def test_getJobStatus_no_name(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) 
+        
+        expStatus = "unknown"
+        obsStatus = self._iRepetJob.getJobStatus(iJob)
+        
+        self.assertEquals(expStatus, obsStatus)
+        self._db.dropTable(self._jobTableName)
+        
+    def test_getJobStatus_non_unique_job(self):
+        # Warning : this case will not append, because recordJob() begin by removeJob()
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
+        sqlCmd += " VALUES ("
+        sqlCmd += " \"%s\"," % ( iJob.jobid )
+        sqlCmd += " \"%s\"," % ( iJob.jobname )
+        sqlCmd += " \"%s\"," % ( iJob.groupid )
+        sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
+        sqlCmd += " \"%s\"," % ( iJob.launcher )
+        sqlCmd += " \"%s\"," % ( iJob.queue )
+        sqlCmd += " \"waiting\","
+        sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
+        sqlCmd += " \"?\" );"
+        self._db.execute( sqlCmd )
+        self._db.execute( sqlCmd )
+        
+        expError = "expError.txt"
+        expErrorHandler = open(expError, "w")
+        expErrorHandler.write("ERROR while getting job status: non-unique jobs\n")
+        expErrorHandler.close()
+        
+        obsError = "obsError.txt"
+        obsErrorHandler = open(obsError, "w")
+        stderrRef = sys.stderr
+        sys.stderr = obsErrorHandler
+        
+        isSysExitRaised = False
+        try:
+            self._iRepetJob.getJobStatus(iJob)
+        except SystemExit:
+            isSysExitRaised = True
+           
+        obsErrorHandler.close()
+        
+        self.assertTrue(isSysExitRaised)
+        self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError))
+        
+        sys.stderr = stderrRef
+        os.remove(obsError)
+        os.remove(expError)
+         
+        self._db.dropTable(self._jobTableName)
+        
+    def test_updateInfoTable(self):
+        self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo")
+        
+        qryParams = "SELECT name, file FROM  info_tables WHERE name=%s AND file=%s"
+        params = (self._jobTableName, "dummyInfo")
+        
+        self._db.execute(qryParams, params)
+        tObs = self._db.fetchall()[0]
+        tExp = (self._jobTableName, "dummyInfo")
+        
+        self.assertEquals(tExp, tObs)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def test_changeJobStatus(self):
+        expStatus = "finished"
+        
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.changeJobStatus(iJob, expStatus, "method")
+        
+        qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" 
+        params = (iJob.jobid, iJob.groupid, iJob.queue)
+        self._db.execute(qryParams, params)
+
+        obsStatus = self._db.fetchall()[0][0]
+        self.assertEquals(expStatus, obsStatus)
+        self._iRepetJob.removeJob(iJob)
+        self._db.dropTable(self._jobTableName)
+
+    def test_getCountStatus(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        
+        iJob1 = self._createJobInstance()
+        iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        
+        self._iRepetJob.recordJob(iJob1)
+        self._iRepetJob.recordJob(iJob2)
+
+        expCount = 2
+        obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting")
+        
+        self.assertEquals(expCount, obsCount)
+        
+    def test_getCountStatus_without_res(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        expCount = 0
+        
+        obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting")
+        
+        self.assertEquals(expCount, obsCount)
+   
+    def test_cleanJobGroup(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob1 = self._createJobInstance()
+        iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        
+        self._iRepetJob.recordJob(iJob1)
+        self._iRepetJob.recordJob(iJob2)
+        self._iRepetJob.recordJob(iJob3)
+        
+        self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid)
+        
+        qryParams = "SELECT count(*) FROM " + self._jobTableName  
+        
+        self._db.execute(qryParams)
+        
+        expCount = 1
+        obsCount = self._db.fetchall()[0][0]
+        
+        self.assertEquals(expCount, obsCount)
+        
+        self._iRepetJob.removeJob(iJob3)
+        self._db.dropTable(self._jobTableName)
+
+    def test_hasUnfinishedJob(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob1 = self._createJobInstance()
+        iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        
+        self._iRepetJob.recordJob(iJob1)
+        self._iRepetJob.recordJob(iJob2)
+        self._iRepetJob.recordJob(iJob3)
+        
+        self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
+        
+        expHasGrpIdFinished = True
+        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
+        
+        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
+        
+        self._iRepetJob.removeJob(iJob1)
+        self._iRepetJob.removeJob(iJob2)
+        self._iRepetJob.removeJob(iJob3)
+        self._db.dropTable(self._jobTableName)
+        
+    def test_hasUnfinishedJob_JobTableNotExists(self):
+        iJob1 = self._createJobInstance()
+        
+        expHasGrpIdFinished = False
+        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
+        
+        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
+        
+    def test_hasUnfinishedJob_AllJobsFinished(self): 
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob1 = self._createJobInstance()
+        iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2")
+        
+        self._iRepetJob.recordJob(iJob1)
+        self._iRepetJob.recordJob(iJob2)
+        self._iRepetJob.recordJob(iJob3)
+        
+        self._iRepetJob.changeJobStatus(iJob1, "finished", "method")
+        self._iRepetJob.changeJobStatus(iJob2, "finished", "method")
+        
+        expHasGrpIdFinished = False
+        obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid)
+        
+        self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished)
+        
+        self._iRepetJob.removeJob(iJob1)
+        self._iRepetJob.removeJob(iJob2)
+        self._iRepetJob.removeJob(iJob3)
+        self._db.dropTable(self._jobTableName)
+        
+    def test_waitJobGroup_with_finished_job(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.changeJobStatus(iJob, "finished", "method")
+        
+        Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0)
+        Exp = None
+        
+        self.assertEquals(Exp, Obs)
+        self._iRepetJob.removeJob(iJob) 
+        
+    def test_waitJobGroup_with_error_job_maxRelaunch_zero(self):
+        Obs = False
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.changeJobStatus(iJob, "error", "method")
+        
+        try:
+            self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0)
+        except SystemExit:
+            Obs = True
+        
+        self.assertTrue(Obs)
+        self._iRepetJob.removeJob(iJob)
+        
+    def test_setJobIdFromSge(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance()
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.setJobIdFromSge(iJob, 1000)
+        
+        qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" 
+        params = (iJob.jobname, iJob.queue, iJob.groupid)
+        
+        self._db.execute(qryParams, params)
+        
+        tObs = self._db.fetchall()[0]
+        tExp =(1000,)
+        
+        self.assertEquals(tExp,tObs)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def test_submitJob_8_fields_for_job_table(self):
+        iJob = self._createJobInstance()
+        self._db.dropTable(self._jobTableName)
+        sqlCmd = "CREATE TABLE " + self._jobTableName 
+        sqlCmd += " ( jobid INT UNSIGNED"
+        sqlCmd += ", groupid VARCHAR(255)"
+        sqlCmd += ", command TEXT"
+        sqlCmd += ", launcher VARCHAR(1024)"
+        sqlCmd += ", queue VARCHAR(255)"
+        sqlCmd += ", status VARCHAR(255)"
+        sqlCmd += ", time DATETIME"
+        sqlCmd += ", node VARCHAR(255) )"
+        self._db.execute(sqlCmd)
+        
+        self._iRepetJob.submitJob(iJob)
+        
+        expFieldsNb = 9
+        obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName))
+        
+        self.assertEquals(expFieldsNb, obsFieldsNb)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def test_getNodesListByGroupId(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
+        iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
+        iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" )
+        
+        self._insertJob(iJob1)
+        self._insertJob(iJob2)
+        self._insertJob(iJob3)
+        
+        expNodeList = ["node1", "node2"]
+        obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid")
+        self.assertEquals(expNodeList, obsNodeList)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def test_getNodesListByGroupId_empty_list(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" )
+        iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" )
+        iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" )
+        
+        self._insertJob(iJob1)
+        self._insertJob(iJob2)
+        self._insertJob(iJob3)
+        
+        expNodeList = []
+        obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3")
+        self.assertEquals(expNodeList, obsNodeList)
+        
+        self._db.dropTable(self._jobTableName)
+        
+    def _insertJob(self, iJob):
+        self._iRepetJob.removeJob( iJob )
+        sqlCmd = "INSERT INTO %s" % ( iJob.tablename )
+        sqlCmd += " VALUES ("
+        sqlCmd += " \"%s\"," % ( iJob.jobid )
+        sqlCmd += " \"%s\"," % ( iJob.jobname )
+        sqlCmd += " \"%s\"," % ( iJob.groupid )
+        sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") )
+        sqlCmd += " \"%s\"," % ( iJob.launcher )
+        sqlCmd += " \"%s\"," % ( iJob.queue )
+        sqlCmd += " \"waiting\","
+        sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
+        sqlCmd += " \"%s\" );" % ( iJob.node )
+        self._iRepetJob.execute( sqlCmd )
+        
+if __name__ == "__main__":
+    unittest.main()