Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,236 @@ +import os +import time +import sys +import stat +import unittest +import glob +from commons.core.sql.DbMySql import DbMySql +from commons.core.sql.RepetJob import RepetJob +from commons.core.sql.Job import Job + +class Test_F_RepetJob(unittest.TestCase): + + def setUp(self): + self._jobTableName = "dummyJobTable" + self._db = DbMySql() + self._iRepetJob = RepetJob() + self._configFileName = "dummyConfigFile" + configF = open(self._configFileName, "w" ) + configF.write( "[repet_env]\n" ) + configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) + configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) + configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) + configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) + configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) + configF.close() + + def tearDown(self): + self._iRepetJob = None + self._db.dropTable( self._jobTableName ) + self._db.close() + os.remove(self._configFileName) + + def test_submitJob_with_multiple_jobs(self): + job1 = self._createJobInstance("job1") + self._createLauncherFile(job1) + + job2 = self._createJobInstance("job2") + self._createLauncherFile(job2) + + job3 = self._createJobInstance("job3") + self._createLauncherFile(job3) + + self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) + + time.sleep(70) + + expJobStatus = "finished" + obsJobStatus1 = self._iRepetJob.getJobStatus(job1) + obsJobStatus2 = self._iRepetJob.getJobStatus(job2) + obsJobStatus3 = self._iRepetJob.getJobStatus(job3) + + self.assertEquals(expJobStatus, obsJobStatus1) + self.assertEquals(expJobStatus, obsJobStatus2) + self.assertEquals(expJobStatus, obsJobStatus3) + + jobName1 = job1.jobname + jobName2 = job2.jobname + jobName3 = job3.jobname + + expErrorFilePrefix1 = jobName1+ ".e" + expOutputFilePrefix1 = jobName1 + ".o" + expErrorFilePrefix2 = jobName2 + ".e" + expOutputFilePrefix2 = jobName2 + ".o" + expErrorFilePrefix3 = jobName3 + ".e" + expOutputFilePrefix3 = jobName3 + ".o" + + lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") + lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") + lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") + lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") + lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") + lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") + + isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) + isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) + isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) + isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) + isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) + isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) + + os.system("rm launcherFileTest*.py *.e* *.o*") + self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) + self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) + self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) + + def test_submitJob_job_already_submitted(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance("job") + self._iRepetJob.recordJob(iJob) + + isSysExitRaised = False + try: + self._iRepetJob.submitJob(iJob) + except SystemExit: + isSysExitRaised = True + self.assertTrue(isSysExitRaised) + + def test_waitJobGroup_with_several_nbTimeOut_waiting(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance("job") + self._createLauncherFile(iJob) + self._iRepetJob.recordJob(iJob) + self._iRepetJob.changeJobStatus(iJob, "running", "method") + + expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid ) + + obsError = "obsError.txt" + obsErrorHandler = open(obsError, "w") + stderrRef = sys.stderr + sys.stderr = obsErrorHandler + + isSysExitRaised = False + try: + self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3) + except SystemExit: + isSysExitRaised = True + + obsErrorHandler.close() + + obsErrorHandler = open(obsError, "r") + obsMsg = obsErrorHandler.readline() + obsErrorHandler.close() + + sys.stderr = stderrRef + os.remove(obsError) + os.system("rm launcherFileTest*.py") + self.assertTrue(isSysExitRaised) + self.assertEquals(expMsg, obsMsg) + + def test_waitJobGroup_with_error_job_maxRelaunch_two(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance("job") + self._createLauncherFile(iJob) + + self._iRepetJob.recordJob(iJob) + self._iRepetJob.changeJobStatus(iJob, "error", "method") + + self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2) + + time.sleep(10) + + expJobStatus = "finished" + obsJobStatus1 = self._iRepetJob.getJobStatus(iJob) + + self.assertEquals(expJobStatus, obsJobStatus1) + + jobName = iJob.jobname + + expErrorFilePrefix1 = jobName + ".e" + expOutputFilePrefix1 = jobName + ".o" + + lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") + lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") + + isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) + isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) + + self._iRepetJob.removeJob(iJob) + os.system("rm launcherFileTest*.py *.e* *.o*") + self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) + + + def test_isJobStillHandledBySge_True(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance("job") + self._createLauncherFile(iJob) + self._iRepetJob.submitJob(iJob) + + isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) + os.system("rm launcherFileTest*.py") + + self.assertTrue(isJobHandledBySge) + + def test_isJobStillHandledBySge_False(self): + self._iRepetJob.createTable(self._jobTableName, "jobs") + iJob = self._createJobInstance("job") + self._createLauncherFile(iJob) + self._iRepetJob.recordJob(iJob) + + isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) + os.system("rm launcherFileTest*.py") + + self.assertFalse(isJobHandledBySge) + + def _createJobInstance(self, name): + return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py") + + def _createLauncherFile(self, iJob): + jobFileHandler = open( iJob.launcher , "w" ) + + launcher = "#!/usr/bin/python\n" + launcher += "import os\n" + launcher += "import sys\n" + + launcher += "print \"system:\", os.uname()\n" + launcher += "sys.stdout.flush()\n" + newStatus = "running" + prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"]) + cmd = prg + cmd += " -t %s" % ( iJob.tablename ) + cmd += " -n %s" % ( iJob.jobname ) + cmd += " -g %s" % ( iJob.groupid ) + if iJob.queue != "": + cmd += " -q %s" % ( iJob.queue ) + cmd += " -s %s" % ( newStatus ) + cmd += " -c %s" %( self._configFileName ) + cmd += " -v 1" + launcher +="os.system( \"" + cmd + "\" )\n" + + launcher += "print \"LAUNCH: "+ iJob.command + "\"\n" + launcher += "sys.stdout.flush()\n" + launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" + launcher += "if exitStatus != 0:\n" + launcher += "\tprint \"ERROR: "+ iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" + + newStatus = "finished" + prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" + cmd = prg + cmd += " -t %s" % ( iJob.tablename ) + cmd += " -n %s" % ( iJob.jobname ) + cmd += " -g %s" % ( iJob.groupid ) + if iJob.queue != "": + cmd += " -q %s" % ( iJob.queue ) + cmd += " -s %s" % ( newStatus ) + cmd += " -c %s" %( self._configFileName ) + cmd += " -v 1" + launcher +="os.system( \"" + cmd + "\" )\n" + launcher += "sys.exit(0)\n" + jobFileHandler.write(launcher) + jobFileHandler.close() + os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC ) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file