Mercurial > repos > yufei-luo > s_mart
view commons/core/launcher/test/Test_Launcher.py @ 14:c79b9ae3f65f
Deleted selected files
author | m-zytnicki |
---|---|
date | Fri, 19 Apr 2013 10:13:11 -0400 |
parents | 769e306b7933 |
children | 94ab73e8a190 |
line wrap: on
line source
from commons.core.utils.FileUtils import FileUtils from commons.core.launcher.Launcher import Launcher from commons.core.launcher.WriteScript import WriteScript from commons.core.stat.Stat import Stat from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory from commons.core.sql.DbFactory import DbFactory from commons.core.sql.Job import Job import unittest import os import shutil import time import stat #TODO: Test_F_Launcher.py : to execute prepareCommands() and runSingleJob() # to test runLauncherForMultipleJobs() #TODO: check clean of "Test_runSingleJob" #TODO: refactoring => choose between "self._queue" or "lResources" to set resources class Test_Launcher(unittest.TestCase): SARUMAN_NAME = "compute-2-46.local" def setUp(self): self._cDir = os.getcwd() self._tmpDir = self._cDir self._groupid = "test" self._jobTable = "dummyJobTable" self._iDb = DbFactory.createInstance() self._iDb.createTable(self._jobTable, "jobs", overwrite = True) self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable) self._queue = "" self._configFileName = "dummyConfigFile" def tearDown(self): self._iDb.dropTable(self._jobTable) self._iDb.close() FileUtils.removeFilesByPattern('*.e*') FileUtils.removeFilesByPattern('*.o*') FileUtils.removeFilesByPattern('launcherFileTest_BeginRun.py') FileUtils.removeFilesByPattern(self._configFileName) FileUtils.removeFilesByPattern('ClusterLauncher_*') def test__init__wrong_fields_for_job_table(self): self._iDb.dropTable(self._jobTable) sqlCmd = "CREATE TABLE " + self._jobTable sqlCmd += " ( jobid INT UNSIGNED" sqlCmd += ", jobname VARCHAR(255)" sqlCmd += ", groupid VARCHAR(255)" sqlCmd += ", command TEXT" sqlCmd += ", launcher VARCHAR(1024)" sqlCmd += ", queue VARCHAR(255)" sqlCmd += ", status VARCHAR(255)" sqlCmd += ", time DATETIME" sqlCmd += ", node VARCHAR(255) )" self._iDb.execute(sqlCmd) acronym = "Test__init__" iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"]) lObsFields = sorted(self._iDb.getFieldList(self._jobTable)) self.assertEquals(lExpFields, lObsFields) expJob = Job(queue = self._queue) obsJob = iLauncher.job self.assertEquals(expJob, obsJob) def test__init__withResources(self): queue = "main.q mem_free=3G" acronym = "Test__init__" expQueue = "main.q" explResources = ['mem_free=3G'] expJob = Job(queue = expQueue, lResources = explResources) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", queue, self._groupid, acronym) obsJob = iLauncher.job self.assertEquals(expJob, obsJob) def test_createGroupidIfItNotExist(self): acronym = "checkGroupID" iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.createGroupidIfItNotExist() obsGroupid = iLauncher.job.groupid self.assertEquals(self._groupid, obsGroupid) def test_createGroupidIfItNotExist_without_groupid(self): groupid = "" acronym = "checkGroupID" iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, groupid, acronym) iLauncher.createGroupidIfItNotExist() obsGroupid = iLauncher.job.groupid self.assertTrue(obsGroupid != "") def test_beginRun_with_Job_finished_in_Table(self): acronym = "BeginRun" iJob = Job(queue = self._queue) self._jobdb.recordJob(iJob) self._jobdb.changeJobStatus(iJob, "finished") iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.beginRun() self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) def test_beginRun_with_Job_unfinished_in_Table(self): acronym = "testU_BeginRun" cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" pyFileName = "%s/launcherFileTest_BeginRun.py" % os.getcwd() if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) else: iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) iWriteScript.run(cmd_start, "", pyFileName) os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self._jobdb.submitJob(iJob) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.beginRun() self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 1) def test_getStatsOfExecutionTime(self): acronym = "test_statTime" expLValues = [1000.00000, 1000.00000] expStat = Stat(expLValues) f = open(acronym +".o1", "w") f.write("executionTime=1000.000000") f.close() f = open(acronym +".o2", "w") f.write("executionTime=1000.000000") f.close() iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) obsStat = iLauncher.getStatsOfExecutionTime(acronym) self.assertEqual(expStat, obsStat) def test_endRun(self): acronym = "testU_EndRun" cmd_start = "log = os.system( \"date;sleep 10;date\" )\n" pyFileName = "%s/launcherFileTest_EndRun.py" % os.getcwd() if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName, lResources=["test=TRUE"]) else: iJob = Job(1, acronym, self._groupid, "", cmd_start, pyFileName) iWriteScript = WriteScript(iJob, self._jobdb, self._cDir, self._tmpDir) iWriteScript.run(cmd_start, "", pyFileName) os.chmod(pyFileName, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) self._jobdb.submitJob(iJob) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.job.groupid = self._groupid iLauncher.endRun() self.assertTrue(self._jobdb.getCountStatus(self._groupid, "finished") == 0) self.assertTrue(self._jobdb.getCountStatus(self._groupid, "error") == 0) self.assertTrue(self._jobdb.getCountStatus(self._groupid, "waiting") == 0) os.remove(iJob.launcher) def test_clean(self): acronym = "test_clean" f = open("ClusterLauncher" + acronym + ".py", "w") f.close() f = open(acronym + ".o1", "w") f.close() f = open(acronym + ".e1", "w") f.close() iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.clean(acronym) self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym + ".py")) def test_clean_without_acronym(self): acronym = "" acronym2 = "toto" f = open("ClusterLauncher" + acronym2 + ".py", "w") f.close() f = open(acronym2 + ".o1", "w") f.close() f = open(acronym2 + ".e1", "w") f.close() iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", self._cDir, self._tmpDir, "", self._queue, self._groupid, acronym2) iLauncher.clean(acronym) self.assertFalse(FileUtils.isRessourceExists("ClusterLauncher" + acronym2 + ".py")) def test_getQueueNameAndResources_queue_no_resource(self): configQueue = "all.q" expQueueName = "all.q" expResources = [] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) def test_getQueueNameAndResources_queue_one_resource(self): configQueue = "test.q 'test=TRUE'" expQueueName = "test.q" expResources = ["test=TRUE"] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) def test_getQueueNameAndResources_queue_two_resources(self): configQueue = "big.q 's_data=8G s_cpu=96:00:00'" expQueueName = "big.q" expResources = ["s_data=8G", "s_cpu=96:00:00"] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) def test_getQueueNameAndResources_no_queue_no_resource(self): configQueue = "" expQueueName = "" expResources = [] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) def test_getQueueNameAndResources_no_queue_one_resource(self): configQueue = "s_data=8G" expQueueName = "" expResources = ["s_data=8G"] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) def test_getQueueNameAndResources_no_queue_two_resource(self): configQueue = "s_data=8G s_cpu=96:00:00" expQueueName = "" expResources = ["s_data=8G", "s_cpu=96:00:00"] iLauncher = Launcher(self._jobdb) obsQueueName, obsResources = iLauncher.getQueueNameAndResources(configQueue) self.assertEquals(expQueueName, obsQueueName) self.assertEquals(expResources, obsResources) # #TODO: test with at least 2 lines in cmd def test_runSingleJob(self): acronym = "Test_runSingleJob" os.mkdir(acronym) os.chdir(acronym) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.job.groupid = self._groupid iLauncher.job.jobname = acronym iLauncher.job.queue = self._queue if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): iLauncher.job.lResources = ["test=TRUE"] cmd = "log = os.system(\"touch 'YuFei'\")\n" iLauncher.runSingleJob(cmd) time.sleep(20) jobStatus = self._jobdb.getJobStatus(iLauncher.job) os.chdir(self._cDir) shutil.rmtree(acronym) self.assertEqual(jobStatus, "finished") def test_runSingleJob_catch_error_wrong_tmpDir(self): acronym = "Test_runSingleJob_catch_error" os.mkdir(acronym) os.chdir(acronym) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), "%s/toto" % self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.job.groupid = self._groupid iLauncher.job.jobname = acronym iLauncher.job.queue = self._queue if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): iLauncher.job.lResources = ["test=TRUE"] cmd = "log = os.system(\"touch 'YuFei'\")\n" iLauncher.runSingleJob(cmd) time.sleep(20) jobStatus = self._jobdb.getJobStatus(iLauncher.job) os.chdir(self._cDir) shutil.rmtree(acronym) self.assertEqual(jobStatus, "error") def test_runSingleJob_catch_error_wrong_cmd(self): acronym = "Test_runSingleJob_catch_error" os.mkdir(acronym) os.chdir(acronym) iLauncher = Launcher(self._jobdb, os.getcwd(), "", "", os.getcwd(), self._tmpDir, "", self._queue, self._groupid, acronym) iLauncher.job.groupid = self._groupid iLauncher.job.jobname = acronym iLauncher.job.queue = self._queue if Test_Launcher.SARUMAN_NAME == os.getenv("HOSTNAME"): iLauncher.job.lResources = ["test=TRUE"] cmd = "log = os.system(\"truc -i toto\")\n" iLauncher.runSingleJob(cmd) time.sleep(20) jobStatus = self._jobdb.getJobStatus(iLauncher.job) self._jobdb.cleanJobGroup(self._groupid) os.chdir(self._cDir) shutil.rmtree(acronym) self.assertEqual(jobStatus, "error") def test_prepareCommands(self): expCmdStart = "os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")\n\tos.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")\n\tlog = os.system(\"touch file\")\n\t" expCmdFinish = "if os.path.exists(\"yufei.align\"):\n\t\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )\n\t" expCmdSize = "fileSize = 3.2\n\t\t" expCmdCopy = "shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")\n\t\tshutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")\n\t\t" lCmdStart = [] lCmdStart.append("os.symlink(\"../Yufei_chunks.fa\", \"Yufei_chunks.fa\")") lCmdStart.append("os.symlink(\"../Yufei_chunks.fa_cut\", \"Yufei_chunks.fa_cut\")") lCmds = [] lCmds.append("log = os.system(\"touch file\")") lCmdFinish = [] lCmdFinish.append("if os.path.exists(\"yufei.align\"):") lCmdFinish.append("\tshutil.move(\"yufei.align\", \"yufeiLuo/.\" )") lCmdSize = [] lCmdSize.append("fileSize = 3.2") lCmdCopy = [] lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa\", \".\")") lCmdCopy.append("shutil.copy(\"PY/Yufei_db/Yufei_chunks.fa_cut\", \".\")") iLauncher = Launcher(self._jobdb) obsCmdStart, obsCmdFinish, obsCmdSize, obsCmdCopy = iLauncher.prepareCommands(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy) self.assertEquals(expCmdStart, obsCmdStart) self.assertEquals(expCmdFinish, obsCmdFinish) self.assertEquals(expCmdSize, obsCmdSize) self.assertEquals(expCmdCopy, obsCmdCopy) def test_getSystemCommand(self): prg = "touch" lArgs = [] lArgs.append("file") expCmd = "log = os.system(\"touch file\")" iLauncher = Launcher(self._jobdb) obsCmd = iLauncher.getSystemCommand(prg, lArgs) self.assertEquals(expCmd, obsCmd) test_suite = unittest.TestSuite() test_suite.addTest( unittest.makeSuite( Test_Launcher ) ) if __name__ == "__main__": unittest.TextTestRunner(verbosity=2).run( test_suite )