Mercurial > repos > yufei-luo > s_mart
view smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line source
from commons.core.launcher.WriteScript import WriteScript from commons.core.sql.Job import Job from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory import sys import stat import os import time import unittest import glob class Test_F_TableJobAdaptator(unittest.TestCase): def setUp(self): self._jobTableName = "dummyJobTable" self._db = DbFactory.createInstance() self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) def tearDown(self): self._db.dropTable(self._jobTableName) self._db.close() def test_submitJob_with_multiple_jobs(self): self._db.createTable(self._jobTableName, "jobs", overwrite = True) job1 = _createJobInstance("job1") _createLauncherFile(job1, self._iTJA) job2 = _createJobInstance("job2") _createLauncherFile(job2, self._iTJA) job3 = _createJobInstance("job3") _createLauncherFile(job3, self._iTJA) self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) time.sleep(120) expJobStatus = "finished" obsJobStatus1 = self._iTJA.getJobStatus(job1) obsJobStatus2 = self._iTJA.getJobStatus(job2) obsJobStatus3 = self._iTJA.getJobStatus(job3) self.assertEquals(expJobStatus, obsJobStatus1) self.assertEquals(expJobStatus, obsJobStatus2) self.assertEquals(expJobStatus, obsJobStatus3) expErrorFilePrefix1 = job1.jobname + ".e" expOutputFilePrefix1 = job1.jobname + ".o" expErrorFilePrefix2 = job2.jobname + ".e" expOutputFilePrefix2 = job2.jobname + ".o" expErrorFilePrefix3 = job3.jobname + ".e" expOutputFilePrefix3 = job3.jobname + ".o" lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) os.system("rm launcherFileTest*.py *.e* *.o*") self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) def test_submitJob_job_already_submitted(self): self._db.createTable(self._jobTableName, "jobs", overwrite = True) iJob = _createJobInstance("job") self._iTJA.recordJob(iJob) isSysExitRaised = False try: self._iTJA.submitJob(iJob) except SystemExit: isSysExitRaised = True self.assertTrue(isSysExitRaised) def test_waitJobGroup_with_error_job_maxRelaunch_two(self): self._db.createTable(self._jobTableName, "jobs", overwrite = True) iJob = _createJobInstance("job") _createLauncherFile(iJob, self._iTJA) self._iTJA.recordJob(iJob) self._iTJA.changeJobStatus(iJob, "error") self._iTJA.waitJobGroup(iJob.groupid, 0, 2) time.sleep(120) expJobStatus = "finished" obsJobStatus1 = self._iTJA.getJobStatus(iJob) self.assertEquals(expJobStatus, obsJobStatus1) expErrorFilePrefix1 = iJob.jobname + ".e" expOutputFilePrefix1 = iJob.jobname + ".o" lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) self._iTJA.removeJob(iJob) os.system("rm launcherFileTest*.py *.e* *.o*") self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) class Test_F_TableJobAdaptator_SGE(unittest.TestCase): def setUp(self): if os.environ["REPET_JOB_MANAGER"].lower() != "sge": print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"] sys.exit(0) self._jobTableName = "dummyJobTable" self._db = DbFactory.createInstance() self._db.createTable(self._jobTableName, "jobs", overwrite = True) self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) self._iJob = _createJobInstance("job") _createLauncherFile(self._iJob, self._iTJA) def tearDown(self): self._db.dropTable(self._jobTableName) self._db.close() def test_waitJobGroup_with_several_nbTimeOut_waiting(self): self._iTJA.recordJob(self._iJob) self._iTJA.changeJobStatus(self._iJob, "running") expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid obsError = "obsError.txt" obsErrorHandler = open(obsError, "w") stderrRef = sys.stderr sys.stderr = obsErrorHandler isSysExitRaised = False try: self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3) except SystemExit: isSysExitRaised = True obsErrorHandler.close() obsErrorHandler = open(obsError, "r") obsMsg = obsErrorHandler.readline() obsErrorHandler.close() sys.stderr = stderrRef os.remove(obsError) os.system("rm launcherFileTest*.py") self.assertTrue(isSysExitRaised) self.assertEquals(expMsg, obsMsg) def test_isJobStillHandledBySge_True(self): self._iTJA.submitJob(self._iJob) isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) os.system("rm launcherFileTest*.py") self.assertTrue(isJobHandledBySge) def test_isJobStillHandledBySge_False(self): self._iTJA.recordJob(self._iJob) isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) os.system("rm launcherFileTest*.py") self.assertFalse(isJobHandledBySge) def _createJobInstance(name): lResources = [] if os.environ.get("HOSTNAME") == "compute-2-46.local": lResources.append("test=TRUE") return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources) def _createLauncherFile(iJob, iTJA): iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd()) iWriteScript.run(iJob.command, "", iJob.launcher) os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO) if __name__ == "__main__": unittest.main()