Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/test/Tst_RepetJob.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/test/Tst_RepetJob.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,395 @@ +import unittest +import sys +import os +import time +from commons.core.sql.DbMySql import DbMySql +from commons.core.sql.Job import Job +from commons.core.sql.RepetJob import RepetJob +from commons.core.utils.FileUtils import FileUtils + +#TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... +class Test_RepetJob( unittest.TestCase ): + + def setUp(self): + self._jobTableName = "dummyJobTable" + self._db = DbMySql() + self._iRepetJob = RepetJob() + + def tearDown(self): + self._iRepetJob = None + self._db.close() + + def _createJobInstance(self): + return Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) + + def test_createJobTable_is_table_created(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + + isTableCreated = self._db.doesTableExist(self._jobTableName) + self.assertTrue(isTableCreated) + + self._db.dropTable(self._jobTableName) + + def test_createJobTable_field_list(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + + obsLFiled = self._db.getFieldList(self._jobTableName) + expLField = ["jobid", "jobname", "groupid", "command", "launcher", "queue", "status", "time", "node"] + + self.assertEquals(expLField, obsLFiled) + + self._db.dropTable(self._jobTableName) + + def test_recordJob(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + + qryParams = "SELECT jobid, groupid, command, launcher, queue, status, node FROM " + self._jobTableName + " WHERE jobid = %s" + params = (iJob.jobid) + + self._db.execute(qryParams, params) + + tObs = self._db.fetchall()[0] + tExp =(iJob.jobid, iJob.groupid, iJob.command, iJob.launcher, iJob.queue, "waiting", "?") + + self.assertEquals(tExp,tObs) + + self._db.dropTable(self._jobTableName) + + def test_removeJob(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + + self._iRepetJob.removeJob(iJob) + + isTableEmpty = self._db.isEmpty(self._jobTableName) + + self.assertTrue(isTableEmpty) + + self._db.dropTable(self._jobTableName) + + def test_getJobStatus(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + + expStatus = "waiting" + obsStatus = self._iRepetJob.getJobStatus(iJob) + + self.assertEquals(expStatus, obsStatus) + self._db.dropTable(self._jobTableName) + + def test_getJobStatus_unknown(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + + expStatus = "unknown" + obsStatus = self._iRepetJob.getJobStatus(iJob) + + self.assertEquals(expStatus, obsStatus) + self._db.dropTable(self._jobTableName) + + def test_getJobStatus_no_name(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = Job( self._jobTableName, 20, "", "groupid", "queue", "command", "launcherFile", "node", "lResources" ) + + expStatus = "unknown" + obsStatus = self._iRepetJob.getJobStatus(iJob) + + self.assertEquals(expStatus, obsStatus) + self._db.dropTable(self._jobTableName) + + def test_getJobStatus_non_unique_job(self): + # Warning : this case will not append, because recordJob() begin by removeJob() + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % ( iJob.jobid ) + sqlCmd += " \"%s\"," % ( iJob.jobname ) + sqlCmd += " \"%s\"," % ( iJob.groupid ) + sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) + sqlCmd += " \"%s\"," % ( iJob.launcher ) + sqlCmd += " \"%s\"," % ( iJob.queue ) + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) + sqlCmd += " \"?\" );" + self._db.execute( sqlCmd ) + self._db.execute( sqlCmd ) + + expError = "expError.txt" + expErrorHandler = open(expError, "w") + expErrorHandler.write("ERROR while getting job status: non-unique jobs\n") + expErrorHandler.close() + + obsError = "obsError.txt" + obsErrorHandler = open(obsError, "w") + stderrRef = sys.stderr + sys.stderr = obsErrorHandler + + isSysExitRaised = False + try: + self._iRepetJob.getJobStatus(iJob) + except SystemExit: + isSysExitRaised = True + + obsErrorHandler.close() + + self.assertTrue(isSysExitRaised) + self.assertTrue(FileUtils.are2FilesIdentical(expError, obsError)) + + sys.stderr = stderrRef + os.remove(obsError) + os.remove(expError) + + self._db.dropTable(self._jobTableName) + + def test_updateInfoTable(self): + self._iRepetJob.updateInfoTable(self._jobTableName, "dummyInfo") + + qryParams = "SELECT name, file FROM info_tables WHERE name=%s AND file=%s" + params = (self._jobTableName, "dummyInfo") + + self._db.execute(qryParams, params) + tObs = self._db.fetchall()[0] + tExp = (self._jobTableName, "dummyInfo") + + self.assertEquals(tExp, tObs) + + self._db.dropTable(self._jobTableName) + + def test_changeJobStatus(self): + expStatus = "finished" + + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + self._iRepetJob.changeJobStatus(iJob, expStatus, "method") + + qryParams = "SELECT status FROM " + self._jobTableName + " WHERE jobid =%s AND groupid=%s AND queue=%s" + params = (iJob.jobid, iJob.groupid, iJob.queue) + self._db.execute(qryParams, params) + + obsStatus = self._db.fetchall()[0][0] + self.assertEquals(expStatus, obsStatus) + self._iRepetJob.removeJob(iJob) + self._db.dropTable(self._jobTableName) + + def test_getCountStatus(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + + iJob1 = self._createJobInstance() + iJob2 = Job(self._jobTableName, 1, "job2", "groupid", "queue2", "command2", "launcherFile2", "node2", "lResources2") + + self._iRepetJob.recordJob(iJob1) + self._iRepetJob.recordJob(iJob2) + + expCount = 2 + obsCount = self._iRepetJob.getCountStatus(self._jobTableName, iJob1.groupid, "waiting") + + self.assertEquals(expCount, obsCount) + + def test_getCountStatus_without_res(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + expCount = 0 + + obsCount = self._iRepetJob.getCountStatus(self._jobTableName, "groupid", "waiting") + + self.assertEquals(expCount, obsCount) + + def test_cleanJobGroup(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob1 = self._createJobInstance() + iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + + self._iRepetJob.recordJob(iJob1) + self._iRepetJob.recordJob(iJob2) + self._iRepetJob.recordJob(iJob3) + + self._iRepetJob.cleanJobGroup(self._jobTableName, iJob1.groupid) + + qryParams = "SELECT count(*) FROM " + self._jobTableName + + self._db.execute(qryParams) + + expCount = 1 + obsCount = self._db.fetchall()[0][0] + + self.assertEquals(expCount, obsCount) + + self._iRepetJob.removeJob(iJob3) + self._db.dropTable(self._jobTableName) + + def test_hasUnfinishedJob(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob1 = self._createJobInstance() + iJob2 = Job(self._jobTableName, 0, "jobname2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(self._jobTableName, 0, "jobname3", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + + self._iRepetJob.recordJob(iJob1) + self._iRepetJob.recordJob(iJob2) + self._iRepetJob.recordJob(iJob3) + + self._iRepetJob.changeJobStatus(iJob2, "finished", "method") + + expHasGrpIdFinished = True + obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) + + self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) + + self._iRepetJob.removeJob(iJob1) + self._iRepetJob.removeJob(iJob2) + self._iRepetJob.removeJob(iJob3) + self._db.dropTable(self._jobTableName) + + def test_hasUnfinishedJob_JobTableNotExists(self): + iJob1 = self._createJobInstance() + + expHasGrpIdFinished = False + obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) + + self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) + + def test_hasUnfinishedJob_AllJobsFinished(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob1 = self._createJobInstance() + iJob2 = Job(self._jobTableName, "jobid2", iJob1.groupid, "queue2", "command2", "launcherFile2", "node2", "lResources2") + iJob3 = Job(self._jobTableName, "jobid2", "groupid3", "queue2", "command2", "launcherFile2", "node2", "lResources2") + + self._iRepetJob.recordJob(iJob1) + self._iRepetJob.recordJob(iJob2) + self._iRepetJob.recordJob(iJob3) + + self._iRepetJob.changeJobStatus(iJob1, "finished", "method") + self._iRepetJob.changeJobStatus(iJob2, "finished", "method") + + expHasGrpIdFinished = False + obsHasGrpIdFinished = self._iRepetJob.hasUnfinishedJob(self._jobTableName, iJob1.groupid) + + self.assertEquals(expHasGrpIdFinished, obsHasGrpIdFinished) + + self._iRepetJob.removeJob(iJob1) + self._iRepetJob.removeJob(iJob2) + self._iRepetJob.removeJob(iJob3) + self._db.dropTable(self._jobTableName) + + def test_waitJobGroup_with_finished_job(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + self._iRepetJob.changeJobStatus(iJob, "finished", "method") + + Obs = self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0) + Exp = None + + self.assertEquals(Exp, Obs) + self._iRepetJob.removeJob(iJob) + + def test_waitJobGroup_with_error_job_maxRelaunch_zero(self): + Obs = False + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + self._iRepetJob.changeJobStatus(iJob, "error", "method") + + try: + self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 0) + except SystemExit: + Obs = True + + self.assertTrue(Obs) + self._iRepetJob.removeJob(iJob) + + def test_setJobIdFromSge(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance() + self._iRepetJob.recordJob(iJob) + self._iRepetJob.setJobIdFromSge(iJob, 1000) + + qryParams = "SELECT jobid FROM " + self._jobTableName + " WHERE jobname = %s AND queue = %s AND groupid = %s" + params = (iJob.jobname, iJob.queue, iJob.groupid) + + self._db.execute(qryParams, params) + + tObs = self._db.fetchall()[0] + tExp =(1000,) + + self.assertEquals(tExp,tObs) + + self._db.dropTable(self._jobTableName) + + def test_submitJob_8_fields_for_job_table(self): + iJob = self._createJobInstance() + self._db.dropTable(self._jobTableName) + sqlCmd = "CREATE TABLE " + self._jobTableName + sqlCmd += " ( jobid INT UNSIGNED" + sqlCmd += ", groupid VARCHAR(255)" + sqlCmd += ", command TEXT" + sqlCmd += ", launcher VARCHAR(1024)" + sqlCmd += ", queue VARCHAR(255)" + sqlCmd += ", status VARCHAR(255)" + sqlCmd += ", time DATETIME" + sqlCmd += ", node VARCHAR(255) )" + self._db.execute(sqlCmd) + + self._iRepetJob.submitJob(iJob) + + expFieldsNb = 9 + obsFieldsNb = len(self._iRepetJob.getFieldList(self._jobTableName)) + + self.assertEquals(expFieldsNb, obsFieldsNb) + + self._db.dropTable(self._jobTableName) + + def test_getNodesListByGroupId(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) + iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) + iJob3 = Job( self._jobTableName, 2, "job3", "groupid2", "queue", "command", "launcherFile", "node3", "lResources" ) + + self._insertJob(iJob1) + self._insertJob(iJob2) + self._insertJob(iJob3) + + expNodeList = ["node1", "node2"] + obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid") + self.assertEquals(expNodeList, obsNodeList) + + self._db.dropTable(self._jobTableName) + + def test_getNodesListByGroupId_empty_list(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob1 = Job( self._jobTableName, 0, "job1", "groupid", "queue", "command", "launcherFile", "node1", "lResources" ) + iJob2 = Job( self._jobTableName, 1, "job2", "groupid", "queue", "command", "launcherFile", "node2", "lResources" ) + iJob3 = Job( self._jobTableName, 2, "job3", "groupid32", "queue", "command", "launcherFile", "node3", "lResources" ) + + self._insertJob(iJob1) + self._insertJob(iJob2) + self._insertJob(iJob3) + + expNodeList = [] + obsNodeList = self._iRepetJob.getNodesListByGroupId(self._jobTableName, "groupid3") + self.assertEquals(expNodeList, obsNodeList) + + self._db.dropTable(self._jobTableName) + + def _insertJob(self, iJob): + self._iRepetJob.removeJob( iJob ) + sqlCmd = "INSERT INTO %s" % ( iJob.tablename ) + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % ( iJob.jobid ) + sqlCmd += " \"%s\"," % ( iJob.jobname ) + sqlCmd += " \"%s\"," % ( iJob.groupid ) + sqlCmd += " \"%s\"," % ( iJob.command.replace("\"","\'") ) + sqlCmd += " \"%s\"," % ( iJob.launcher ) + sqlCmd += " \"%s\"," % ( iJob.queue ) + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) + sqlCmd += " \"%s\" );" % ( iJob.node ) + self._iRepetJob.execute( sqlCmd ) + +if __name__ == "__main__": + unittest.main()