Mercurial > repos > yufei-luo > s_mart
view smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line source
import os import time import sys import stat import unittest import glob from commons.core.sql.DbMySql import DbMySql from commons.core.sql.RepetJob import RepetJob from commons.core.sql.Job import Job class Test_F_RepetJob(unittest.TestCase): def setUp(self): self._jobTableName = "dummyJobTable" self._db = DbMySql() self._iRepetJob = RepetJob() self._configFileName = "dummyConfigFile" configF = open(self._configFileName, "w" ) configF.write( "[repet_env]\n" ) configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) configF.close() def tearDown(self): self._iRepetJob = None self._db.dropTable( self._jobTableName ) self._db.close() os.remove(self._configFileName) def test_submitJob_with_multiple_jobs(self): job1 = self._createJobInstance("job1") self._createLauncherFile(job1) job2 = self._createJobInstance("job2") self._createLauncherFile(job2) job3 = self._createJobInstance("job3") self._createLauncherFile(job3) self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) time.sleep(70) expJobStatus = "finished" obsJobStatus1 = self._iRepetJob.getJobStatus(job1) obsJobStatus2 = self._iRepetJob.getJobStatus(job2) obsJobStatus3 = self._iRepetJob.getJobStatus(job3) self.assertEquals(expJobStatus, obsJobStatus1) self.assertEquals(expJobStatus, obsJobStatus2) self.assertEquals(expJobStatus, obsJobStatus3) jobName1 = job1.jobname jobName2 = job2.jobname jobName3 = job3.jobname expErrorFilePrefix1 = jobName1+ ".e" expOutputFilePrefix1 = jobName1 + ".o" expErrorFilePrefix2 = jobName2 + ".e" expOutputFilePrefix2 = jobName2 + ".o" expErrorFilePrefix3 = jobName3 + ".e" expOutputFilePrefix3 = jobName3 + ".o" lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) os.system("rm launcherFileTest*.py *.e* *.o*") self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) def test_submitJob_job_already_submitted(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance("job") self._iRepetJob.recordJob(iJob) isSysExitRaised = False try: self._iRepetJob.submitJob(iJob) except SystemExit: isSysExitRaised = True self.assertTrue(isSysExitRaised) def test_waitJobGroup_with_several_nbTimeOut_waiting(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance("job") self._createLauncherFile(iJob) self._iRepetJob.recordJob(iJob) self._iRepetJob.changeJobStatus(iJob, "running", "method") expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid ) obsError = "obsError.txt" obsErrorHandler = open(obsError, "w") stderrRef = sys.stderr sys.stderr = obsErrorHandler isSysExitRaised = False try: self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3) except SystemExit: isSysExitRaised = True obsErrorHandler.close() obsErrorHandler = open(obsError, "r") obsMsg = obsErrorHandler.readline() obsErrorHandler.close() sys.stderr = stderrRef os.remove(obsError) os.system("rm launcherFileTest*.py") self.assertTrue(isSysExitRaised) self.assertEquals(expMsg, obsMsg) def test_waitJobGroup_with_error_job_maxRelaunch_two(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance("job") self._createLauncherFile(iJob) self._iRepetJob.recordJob(iJob) self._iRepetJob.changeJobStatus(iJob, "error", "method") self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2) time.sleep(10) expJobStatus = "finished" obsJobStatus1 = self._iRepetJob.getJobStatus(iJob) self.assertEquals(expJobStatus, obsJobStatus1) jobName = iJob.jobname expErrorFilePrefix1 = jobName + ".e" expOutputFilePrefix1 = jobName + ".o" lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) self._iRepetJob.removeJob(iJob) os.system("rm launcherFileTest*.py *.e* *.o*") self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) def test_isJobStillHandledBySge_True(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance("job") self._createLauncherFile(iJob) self._iRepetJob.submitJob(iJob) isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) os.system("rm launcherFileTest*.py") self.assertTrue(isJobHandledBySge) def test_isJobStillHandledBySge_False(self): self._iRepetJob.createTable(self._jobTableName, "jobs") iJob = self._createJobInstance("job") self._createLauncherFile(iJob) self._iRepetJob.recordJob(iJob) isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname) os.system("rm launcherFileTest*.py") self.assertFalse(isJobHandledBySge) def _createJobInstance(self, name): return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py") def _createLauncherFile(self, iJob): jobFileHandler = open( iJob.launcher , "w" ) launcher = "#!/usr/bin/python\n" launcher += "import os\n" launcher += "import sys\n" launcher += "print \"system:\", os.uname()\n" launcher += "sys.stdout.flush()\n" newStatus = "running" prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"]) cmd = prg cmd += " -t %s" % ( iJob.tablename ) cmd += " -n %s" % ( iJob.jobname ) cmd += " -g %s" % ( iJob.groupid ) if iJob.queue != "": cmd += " -q %s" % ( iJob.queue ) cmd += " -s %s" % ( newStatus ) cmd += " -c %s" %( self._configFileName ) cmd += " -v 1" launcher +="os.system( \"" + cmd + "\" )\n" launcher += "print \"LAUNCH: "+ iJob.command + "\"\n" launcher += "sys.stdout.flush()\n" launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n" launcher += "if exitStatus != 0:\n" launcher += "\tprint \"ERROR: "+ iJob.command + " returned exit status '%i'\" % ( exitStatus )\n" newStatus = "finished" prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" cmd = prg cmd += " -t %s" % ( iJob.tablename ) cmd += " -n %s" % ( iJob.jobname ) cmd += " -g %s" % ( iJob.groupid ) if iJob.queue != "": cmd += " -q %s" % ( iJob.queue ) cmd += " -s %s" % ( newStatus ) cmd += " -c %s" %( self._configFileName ) cmd += " -v 1" launcher +="os.system( \"" + cmd + "\" )\n" launcher += "sys.exit(0)\n" jobFileHandler.write(launcher) jobFileHandler.close() os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC ) if __name__ == "__main__": unittest.main()