Mercurial > repos > yufei-luo > s_mart
view commons/core/launcher/test/Test_WriteScript.py @ 14:c79b9ae3f65f
Deleted selected files
author | m-zytnicki |
---|---|
date | Fri, 19 Apr 2013 10:13:11 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
from commons.core.utils.FileUtils import FileUtils from commons.core.launcher.WriteScript import WriteScript from commons.core.sql.Job import Job from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory import unittest import os import shutil import time import threading class Test_WriteScript(unittest.TestCase): def setUp(self): self._testDir = os.getcwd() self._acronym = "dummyAcronym" self._jobTable = "dummyJobsTable" self._iDb = DbFactory.createInstance() self._iDb.createTable(self._jobTable, "jobs", overwrite = True) self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable) self._job = Job() self._job.groupid = "groupid" self._job.jobname = self._acronym self._job.launcher = "ClusterLauncher" self._jobdb.recordJob(self._job) self._dummyScratch = "dummyScratch" os.mkdir(self._dummyScratch) os.chdir(self._dummyScratch) self._tmpDir = os.getcwd() self._iScriptWriter = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir) def tearDown(self): self._iDb.dropTable(self._jobTable) self._iDb.close() if FileUtils.isRessourceExists(self._dummyScratch): shutil.rmtree(self._dummyScratch) def test_run(self): isScriptAsRun = False fileToCreate = 'dummyFile' cmdStart = "log = os.system( \"touch %s\" )\n" % fileToCreate cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir) pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym) self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName) os.system("python %s" % pyFileName) os.chdir(self._testDir) if FileUtils.isRessourceExists(fileToCreate): os.remove(fileToCreate) isScriptAsRun = True expJobStatus = "finished" obsJobStatus = self._jobdb.getJobStatus(self._job) self.assertTrue(isScriptAsRun) self.assertEquals(expJobStatus, obsJobStatus) def test_run_with_cmdSize_and_cmdCopy(self): isScriptAsRun = False fileToCreate = 'dummyFile' fileSize = 0.5 cmdSize = "fileSize = %f\n" % fileSize cmdCopy = "os.system(\"touch bank.fa\")\n" cmdStart = "log = os.system(\"touch %s\")\n" % fileToCreate cmdFinish = "shutil.move(\"%s\", \"%s\")" % (fileToCreate, self._testDir) pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym) iWriteScript = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir, True) iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy) os.system("python %s" % pyFileName) os.chdir(self._testDir) if FileUtils.isRessourceExists(fileToCreate): os.remove(fileToCreate) isScriptAsRun = True expJobStatus = "finished" obsJobStatus = self._jobdb.getJobStatus(self._job) self.assertTrue(isScriptAsRun) self.assertEquals(expJobStatus, obsJobStatus) #TODO: how to test ? # def test_run_2_jobs_trying_to_create_same_groupIdDir(self): # fileToCreate1 = 'dummyFile1' # fileToCreate2 = 'dummyFile2' # flagFileOSError = "osErrorRaised" # # fileSize = 0.5 # cmd_checkSize = "" # cmd_checkSize += "if not os.path.exists( \"%s\" ):\n" % self._job.groupid # cmd_checkSize += "\tfileSize = %f\n" % fileSize # # cmd_checkGroupidDir1 = "" # cmd_checkGroupidDir1 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid # cmd_checkGroupidDir1 += "\ttry:\n" # cmd_checkGroupidDir1 += "\t\ttime.sleep(10)\n" # cmd_checkGroupidDir1 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid # cmd_checkGroupidDir1 += "\texcept OSError, e :\n" # cmd_checkGroupidDir1 += "\t\tos.system(\"touch %s\")\n" % flagFileOSError # cmd_checkGroupidDir1 += "\t\tif e.args[0] != 17:\n" # cmd_checkGroupidDir1 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid # cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid # cmd_checkGroupidDir1 += "\tos.system(\"touch bank.fa\")\n" #cp # cmd_checkGroupidDir1 += "else:\n" # cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid # # cmdStart1 = "log = os.system(\"touch %s\")\n" % fileToCreate1 # cmdFinish1 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate1, self._testDir) # pyFileName1 = "%s/ClusterLauncher1_job1.py" % os.getcwd() # # cmd_checkGroupidDir2 = "" # cmd_checkGroupidDir2 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid # cmd_checkGroupidDir2 += "\ttry:\n" # cmd_checkGroupidDir2 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid # cmd_checkGroupidDir2 += "\texcept OSError, e :\n" # cmd_checkGroupidDir2 += "\t\tif e.args[0] != 17:\n" # cmd_checkGroupidDir2 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid # cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid # cmd_checkGroupidDir2 += "\tos.system(\"touch bank.fa\")\n" #cp # cmd_checkGroupidDir2 += "else:\n" # cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid # # cmdStart2 = "log = os.system(\"touch %s\")\n" % fileToCreate2 # cmdFinish2 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate2, self._testDir) # pyFileName2 = "%s/ClusterLauncher2_job2.py" % os.getcwd() # # job1 = Job(self._jobTable, jobname = "job1", groupid = self._job.groupid) # self._jobdb.recordJob(job1) # job2 = Job(self._jobTable, jobname = "job2", groupid = self._job.groupid) # self._jobdb.recordJob(job2) # iScriptWriter1 = WriteScript(job1, self._jobdb, self._testDir, self._tmpDir) # iScriptWriter1.run(cmdStart1, cmdFinish1, pyFileName1, cmd_checkSize, cmd_checkGroupidDir1) # iScriptWriter2 = WriteScript(job2, self._jobdb, self._testDir, self._tmpDir) # iScriptWriter2.run(cmdStart2, cmdFinish2, pyFileName2, cmd_checkSize, cmd_checkGroupidDir2) # # iCFT1 = CreateFileThread(pyFileName1) # iCFT2 = CreateFileThread(pyFileName2) # iCFT1.start() # iCFT2.start() # while iCFT1.isAlive() or iCFT2.isAlive(): # time.sleep(5) # self.assertTrue(FileUtils.isRessourceExists(flagFileOSError)) # os.chdir(self._testDir) # # if FileUtils.isRessourceExists(fileToCreate1): # os.remove(fileToCreate1) # # if FileUtils.isRessourceExists(fileToCreate2): # os.remove(fileToCreate2) def test_run_2_lines_in_cmd_start(self): isScriptAsRun = False fileToCreate = 'dummyFile' cmdStart = "log = 0\n\t" cmdStart += "if True:\n\t" cmdStart += "\tos.system( \"touch dummyFile\" )\n" cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir) pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym) self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName) os.system("python %s" % pyFileName) os.chdir(self._testDir) if FileUtils.isRessourceExists(fileToCreate): os.remove(fileToCreate) isScriptAsRun = True self.assertTrue(isScriptAsRun) def test_run_2_lines_in_cmd_finish(self): isScriptAsRun = False fileToCreate = 'dummyFile' cmdStart = "log = 0\n\t" cmdStart += "if True:\n\t" cmdStart += "\tos.system( \"touch dummyFile\" )\n" cmdFinish = "if True:\n\t" cmdFinish += "\tos.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir) pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym) self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName) os.system("python %s" % pyFileName) os.chdir(self._testDir) if FileUtils.isRessourceExists(fileToCreate): os.remove(fileToCreate) isScriptAsRun = True self.assertTrue(isScriptAsRun) def test_fillTemplate_with_JobScriptTemplate(self): os.chdir("..") d = { "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch", "jobTableName" : "dummyJobsTable", "groupId" : "groupid", "jobName" : "job1", "launcher" : "ClusterLauncher", "time" : "20110505-105353", "repet_path" : "/home/user/workspace/repet_pipe", "repet_host" : "pisano", "repet_user" : "user", "repet_pw" : "user", "repet_db" : "repet_user", "repet_port" : "3306", "cmdStart" : "log = os.system(\"touch dummyFile1\")", "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")", "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/" } expFileName = "expFiles/expJobScriptTemplate.py" obsFileName = "obsFile.py" iWS = WriteScript() iWS.fillTemplate(obsFileName, d) self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) os.remove(obsFileName) def test_fillTemplate_with_JobScriptTemplate_2_lines_in_cmd_start(self): os.chdir("..") d = { "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch", "jobTableName" : "dummyJobsTable", "groupId" : "groupid", "jobName" : "job1", "launcher" : "ClusterLauncher", "time" : "20110505-105353", "repet_path" : "/home/user/workspace/repet_pipe", "repet_host" : "pisano", "repet_user" : "user", "repet_pw" : "user", "repet_db" : "repet_user", "repet_port" : "3306", "cmdStart" : "print \"Hello Yufei\"\n\tlog = os.system(\"touch dummyFile1\")", "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")", "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/" } expFileName = "expFiles/expJobScriptTemplate_cmdWith2Lines.py" obsFileName = "obsFile.py" iWS = WriteScript() iWS.fillTemplate(obsFileName, d) self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) os.remove(obsFileName) def test_fillTemplate_with_JobScriptWithFilesCopyTemplate(self): os.chdir("..") d = { "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch", "jobTableName" : "dummyJobsTable", "groupId" : "groupid", "jobName" : "job1", "launcher" : "ClusterLauncher", "time" : "20110505-105353", "repet_path" : "/home/user/workspace/repet_pipe", "repet_host" : "pisano", "repet_user" : "user", "repet_pw" : "user", "repet_db" : "repet_user", "repet_port" : "3306", "cmdStart" : "log = os.system(\"touch dummyFile1\")", "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")", "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/", "cmdSize" : "fileSize = 0.500000", "cmdCopy" : "os.system(\"touch bank.fa\")" } expFileName = "expFiles/expJobScriptWithFilesCopyTemplate.py" obsFileName = "obsFile.py" iWS = WriteScript(chooseTemplateWithCopy = True) iWS.fillTemplate(obsFileName, d) self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) os.remove(obsFileName) def test_fillTemplate_with_JobScriptTemplateLight(self): os.chdir("..") d = { "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch", "jobTableName" : "dummyJobsTable", "groupId" : "groupid", "jobName" : "job1", "launcher" : "ClusterLauncher", "time" : "20110505-105353", "repet_path" : "/home/user/workspace/repet_pipe", "cmdStart" : "log = os.system(\"touch dummyFile1\")", "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")", "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/", "cmdSize" : "fileSize = 0.500000", "cmdCopy" : "os.system(\"touch bank.fa\")" } expFileName = "expFiles/expJobScriptTemplateLight.py" obsFileName = "obs.py" iWS = WriteScript(chooseTemplateLight = True) iWS.fillTemplate(obsFileName, d) self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) os.remove(obsFileName) def test_createJobScriptDict(self): os.chdir("..") cmd_start = "log = os.system(\"touch dummyFile1\")" cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")" cmd_size = "" cmd_copy = "" expDict = { "tmpDir" : self._tmpDir, "jobTableName" : self._jobTable, "groupId" : self._job.groupid, "jobName" : self._acronym, "launcher" : self._job.launcher, "time" : time.strftime("%Y%m%d-%H%M%S"), "repet_path" : os.environ["REPET_PATH"], "repet_host" : os.environ["REPET_HOST"], "repet_user" : os.environ["REPET_USER"], "repet_pw" : os.environ["REPET_PW"], "repet_db" : os.environ["REPET_DB"], "repet_port" : os.environ["REPET_PORT"], "cmdStart" : cmd_start, "cmdFinish" : cmd_finish, "cDir" : self._testDir, "cmdSize" : cmd_size, "cmdCopy" : cmd_copy } obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy) self.assertEquals(expDict, obsDict) def test_createJobScriptDict_with_cmdSize_and_cmdCopy(self): os.chdir("..") cmd_start = "log = os.system(\"touch dummyFile1\")" cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")" cmd_size = "fileSize = 0.500000" cmd_copy = "os.system(\"touch bank.fa\")" expDict = { "tmpDir" : self._tmpDir, "jobTableName" : self._jobTable, "groupId" : self._job.groupid, "jobName" : self._acronym, "launcher" : self._job.launcher, "time" : time.strftime("%Y%m%d-%H%M%S"), "repet_path" : os.environ["REPET_PATH"], "repet_host" : os.environ["REPET_HOST"], "repet_user" : os.environ["REPET_USER"], "repet_pw" : os.environ["REPET_PW"], "repet_db" : os.environ["REPET_DB"], "repet_port" : os.environ["REPET_PORT"], "cmdStart" : cmd_start, "cmdFinish" : cmd_finish, "cDir" : self._testDir, "cmdSize" : cmd_size, "cmdCopy" : cmd_copy } obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy) self.assertEquals(expDict, obsDict) class CreateFileThread(threading.Thread): def __init__(self, pyFileName): threading.Thread.__init__(self) self._pyFileName = pyFileName def run(self): os.system("python %s" % self._pyFileName) test_suite = unittest.TestSuite() test_suite.addTest( unittest.makeSuite( Test_WriteScript ) ) if __name__ == "__main__": unittest.TextTestRunner(verbosity=2).run( test_suite )