view commons/core/launcher/test/Test_WriteScript.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line source

from commons.core.utils.FileUtils import FileUtils
from commons.core.launcher.WriteScript import WriteScript
from commons.core.sql.Job import Job
from commons.core.sql.DbFactory import DbFactory
from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
import unittest
import os
import shutil
import time
import threading

class Test_WriteScript(unittest.TestCase):

    def setUp(self):
        self._testDir = os.getcwd()
        self._acronym = "dummyAcronym"
        self._jobTable = "dummyJobsTable"
        self._iDb = DbFactory.createInstance()
        self._iDb.createTable(self._jobTable, "jobs", overwrite = True)
        self._jobdb = TableJobAdaptatorFactory.createInstance(self._iDb, self._jobTable)
        self._job = Job()
        self._job.groupid = "groupid"
        self._job.jobname = self._acronym
        self._job.launcher = "ClusterLauncher"
        self._jobdb.recordJob(self._job)
        self._dummyScratch = "dummyScratch"
        os.mkdir(self._dummyScratch)
        os.chdir(self._dummyScratch)
        self._tmpDir = os.getcwd()
        self._iScriptWriter = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir)
        
    def tearDown(self):
        self._iDb.dropTable(self._jobTable)
        self._iDb.close()
        if FileUtils.isRessourceExists(self._dummyScratch):
            shutil.rmtree(self._dummyScratch)

    def test_run(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        cmdStart = "log = os.system( \"touch %s\" )\n" % fileToCreate
        cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)

        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        expJobStatus = "finished"    
        obsJobStatus = self._jobdb.getJobStatus(self._job)
            
        self.assertTrue(isScriptAsRun)
        self.assertEquals(expJobStatus, obsJobStatus)
        
    def test_run_with_cmdSize_and_cmdCopy(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        fileSize = 0.5
        cmdSize = "fileSize = %f\n" % fileSize
        cmdCopy = "os.system(\"touch bank.fa\")\n"
        cmdStart = "log = os.system(\"touch %s\")\n" % fileToCreate
        cmdFinish = "shutil.move(\"%s\", \"%s\")" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        iWriteScript = WriteScript(self._job, self._jobdb, self._testDir, self._tmpDir, True)
        iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
        os.system("python %s" % pyFileName)

        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        expJobStatus = "finished"    
        obsJobStatus = self._jobdb.getJobStatus(self._job)
            
        self.assertTrue(isScriptAsRun)
        self.assertEquals(expJobStatus, obsJobStatus)

#TODO: how to test ?
#    def test_run_2_jobs_trying_to_create_same_groupIdDir(self):
#        fileToCreate1 = 'dummyFile1'
#        fileToCreate2 = 'dummyFile2'
#        flagFileOSError = "osErrorRaised"
#        
#        fileSize = 0.5
#        cmd_checkSize = ""
#        cmd_checkSize += "if not os.path.exists( \"%s\" ):\n" % self._job.groupid
#        cmd_checkSize += "\tfileSize = %f\n" % fileSize
#        
#        cmd_checkGroupidDir1 = ""
#        cmd_checkGroupidDir1 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\ttry:\n"
#        cmd_checkGroupidDir1 += "\t\ttime.sleep(10)\n"
#        cmd_checkGroupidDir1 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\texcept OSError, e :\n"
#        cmd_checkGroupidDir1 += "\t\tos.system(\"touch %s\")\n" % flagFileOSError
#        cmd_checkGroupidDir1 += "\t\tif e.args[0] != 17:\n"
#        cmd_checkGroupidDir1 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir1 += "\tos.system(\"touch bank.fa\")\n" #cp
#        cmd_checkGroupidDir1 += "else:\n"
#        cmd_checkGroupidDir1 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        
#        cmdStart1 = "log = os.system(\"touch %s\")\n" % fileToCreate1
#        cmdFinish1 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate1, self._testDir)
#        pyFileName1 = "%s/ClusterLauncher1_job1.py" % os.getcwd()
#       
#        cmd_checkGroupidDir2 = ""
#        cmd_checkGroupidDir2 += "if not os.path.exists(\"%s\"):\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\ttry:\n"
#        cmd_checkGroupidDir2 += "\t\tos.mkdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\texcept OSError, e :\n"
#        cmd_checkGroupidDir2 += "\t\tif e.args[0] != 17:\n"
#        cmd_checkGroupidDir2 += "\t\t\traise RepetException(\"ERROR: can't create '%s'\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        cmd_checkGroupidDir2 += "\tos.system(\"touch bank.fa\")\n" #cp
#        cmd_checkGroupidDir2 += "else:\n"
#        cmd_checkGroupidDir2 += "\tos.chdir(\"%s\")\n" % self._job.groupid
#        
#        cmdStart2 = "log = os.system(\"touch %s\")\n" % fileToCreate2
#        cmdFinish2 = "shutil.move(\"%s\", \"%s\")\n" % (fileToCreate2, self._testDir)
#        pyFileName2 = "%s/ClusterLauncher2_job2.py" % os.getcwd()
#            
#        job1 = Job(self._jobTable, jobname = "job1", groupid = self._job.groupid)
#        self._jobdb.recordJob(job1)
#        job2 = Job(self._jobTable, jobname = "job2", groupid = self._job.groupid)
#        self._jobdb.recordJob(job2)
#        iScriptWriter1 = WriteScript(job1, self._jobdb, self._testDir, self._tmpDir)
#        iScriptWriter1.run(cmdStart1, cmdFinish1, pyFileName1, cmd_checkSize, cmd_checkGroupidDir1)
#        iScriptWriter2 = WriteScript(job2, self._jobdb, self._testDir, self._tmpDir)
#        iScriptWriter2.run(cmdStart2, cmdFinish2, pyFileName2, cmd_checkSize, cmd_checkGroupidDir2)
#    
#        iCFT1 = CreateFileThread(pyFileName1)
#        iCFT2 = CreateFileThread(pyFileName2)
#        iCFT1.start()
#        iCFT2.start()
#        while iCFT1.isAlive() or iCFT2.isAlive():
#            time.sleep(5)
#        self.assertTrue(FileUtils.isRessourceExists(flagFileOSError))
#        os.chdir(self._testDir)
#        
#        if FileUtils.isRessourceExists(fileToCreate1):
#            os.remove(fileToCreate1)
#            
#        if FileUtils.isRessourceExists(fileToCreate2):            
#            os.remove(fileToCreate2)
    
    def test_run_2_lines_in_cmd_start(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        
        cmdStart = "log = 0\n\t"
        cmdStart += "if True:\n\t"
        cmdStart += "\tos.system( \"touch dummyFile\" )\n"
        cmdFinish = "os.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)
        
        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        self.assertTrue(isScriptAsRun)

    def test_run_2_lines_in_cmd_finish(self):
        isScriptAsRun = False
        fileToCreate = 'dummyFile'
        
        cmdStart = "log = 0\n\t"
        cmdStart += "if True:\n\t"
        cmdStart += "\tos.system( \"touch dummyFile\" )\n"
        cmdFinish = "if True:\n\t"
        cmdFinish += "\tos.system(\"mv %s %s\" )\n" % (fileToCreate, self._testDir)
        pyFileName = "%s/ClusterLauncher_%s.py" % (os.getcwd(), self._acronym)       
        
        self._iScriptWriter.run(cmdStart, cmdFinish, pyFileName)
        os.system("python %s" % pyFileName)
        
        os.chdir(self._testDir)
        if FileUtils.isRessourceExists(fileToCreate):
            os.remove(fileToCreate)
            isScriptAsRun = True
        self.assertTrue(isScriptAsRun)
        
    def test_fillTemplate_with_JobScriptTemplate(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
             }
        expFileName = "expFiles/expJobScriptTemplate.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript()
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_fillTemplate_with_JobScriptTemplate_2_lines_in_cmd_start(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "print \"Hello Yufei\"\n\tlog = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/"
             }
        expFileName = "expFiles/expJobScriptTemplate_cmdWith2Lines.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript()
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_fillTemplate_with_JobScriptWithFilesCopyTemplate(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "repet_host" : "pisano",
             "repet_user" : "user",
             "repet_pw" : "user",
             "repet_db" : "repet_user",
             "repet_port" : "3306",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
             "cmdSize" : "fileSize = 0.500000",
             "cmdCopy" : "os.system(\"touch bank.fa\")"
             }
        expFileName = "expFiles/expJobScriptWithFilesCopyTemplate.py"
        obsFileName = "obsFile.py"
        
        iWS = WriteScript(chooseTemplateWithCopy = True)
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)

    def test_fillTemplate_with_JobScriptTemplateLight(self):
        os.chdir("..")
        d = {
             "tmpDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/dummyScratch",
             "jobTableName" : "dummyJobsTable",
             "groupId" : "groupid",
             "jobName" : "job1",
             "launcher" : "ClusterLauncher",
             "time" : "20110505-105353",
             "repet_path" : "/home/user/workspace/repet_pipe",
             "cmdStart" : "log = os.system(\"touch dummyFile1\")",
             "cmdFinish" : "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")",
             "cDir" : "/home/user/workspace/repet_pipe/commons/core/launcher/test/",
             "cmdSize" : "fileSize = 0.500000",
             "cmdCopy" : "os.system(\"touch bank.fa\")"
             }
        expFileName = "expFiles/expJobScriptTemplateLight.py"
        obsFileName = "obs.py"
        
        iWS = WriteScript(chooseTemplateLight = True)
        iWS.fillTemplate(obsFileName, d)
        self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName))
        os.remove(obsFileName)
        
    def test_createJobScriptDict(self):
        os.chdir("..")
        cmd_start = "log = os.system(\"touch dummyFile1\")"
        cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
        cmd_size = ""
        cmd_copy = ""
        expDict = {
             "tmpDir" : self._tmpDir,
             "jobTableName" : self._jobTable,
             "groupId" : self._job.groupid,
             "jobName" : self._acronym,
             "launcher" : self._job.launcher,
             "time" : time.strftime("%Y%m%d-%H%M%S"),
             "repet_path" : os.environ["REPET_PATH"],
             "repet_host" : os.environ["REPET_HOST"],
             "repet_user" : os.environ["REPET_USER"],
             "repet_pw" : os.environ["REPET_PW"],
             "repet_db" : os.environ["REPET_DB"],
             "repet_port" : os.environ["REPET_PORT"],
             "cmdStart" : cmd_start,
             "cmdFinish" : cmd_finish,
             "cDir" : self._testDir,
             "cmdSize" : cmd_size,
             "cmdCopy" : cmd_copy
             }
        obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
        self.assertEquals(expDict, obsDict)
        
    def test_createJobScriptDict_with_cmdSize_and_cmdCopy(self):
        os.chdir("..")
        cmd_start = "log = os.system(\"touch dummyFile1\")"
        cmd_finish = "shutil.move(\"dummyFile1\", \"/home/user/workspace/repet_pipe/commons/core/launcher/test\")"
        cmd_size = "fileSize = 0.500000"
        cmd_copy = "os.system(\"touch bank.fa\")"
        expDict = {
             "tmpDir" : self._tmpDir,
             "jobTableName" : self._jobTable,
             "groupId" : self._job.groupid,
             "jobName" : self._acronym,
             "launcher" : self._job.launcher,
             "time" : time.strftime("%Y%m%d-%H%M%S"),
             "repet_path" : os.environ["REPET_PATH"],
             "repet_host" : os.environ["REPET_HOST"],
             "repet_user" : os.environ["REPET_USER"],
             "repet_pw" : os.environ["REPET_PW"],
             "repet_db" : os.environ["REPET_DB"],
             "repet_port" : os.environ["REPET_PORT"],
             "cmdStart" : cmd_start,
             "cmdFinish" : cmd_finish,
             "cDir" : self._testDir,
             "cmdSize" : cmd_size,
             "cmdCopy" : cmd_copy
             }
        obsDict = self._iScriptWriter.createJobScriptDict(cmd_start, cmd_finish, cmd_size, cmd_copy)
        self.assertEquals(expDict, obsDict)
        
class CreateFileThread(threading.Thread):

    def __init__(self, pyFileName):
        threading.Thread.__init__(self)
        self._pyFileName = pyFileName
        
    def run(self):
        os.system("python %s" % self._pyFileName)

test_suite = unittest.TestSuite()
test_suite.addTest( unittest.makeSuite( Test_WriteScript ) )
if __name__ == "__main__":
        unittest.TextTestRunner(verbosity=2).run( test_suite )