diff smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py @ 0:e0f8dcca02ed

Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author yufei-luo
date Thu, 17 Jan 2013 10:52:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/smart_toolShed/commons/core/sql/test/Tst_F_RepetJob.py	Thu Jan 17 10:52:14 2013 -0500
@@ -0,0 +1,236 @@
+import os
+import time
+import sys
+import stat
+import unittest
+import glob
+from commons.core.sql.DbMySql import DbMySql
+from commons.core.sql.RepetJob import RepetJob
+from commons.core.sql.Job import Job
+
+class Test_F_RepetJob(unittest.TestCase):
+
+    def setUp(self):
+        self._jobTableName = "dummyJobTable"
+        self._db = DbMySql()
+        self._iRepetJob = RepetJob()
+        self._configFileName = "dummyConfigFile"
+        configF = open(self._configFileName, "w" )
+        configF.write( "[repet_env]\n" )
+        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
+        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
+        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
+        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
+        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
+        configF.close()
+
+    def tearDown(self):
+        self._iRepetJob = None
+        self._db.dropTable( self._jobTableName )
+        self._db.close()
+        os.remove(self._configFileName)
+    
+    def test_submitJob_with_multiple_jobs(self):
+        job1 = self._createJobInstance("job1")
+        self._createLauncherFile(job1)
+
+        job2 = self._createJobInstance("job2")
+        self._createLauncherFile(job2)
+
+        job3 = self._createJobInstance("job3")
+        self._createLauncherFile(job3)
+        
+        self._iRepetJob.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+        self._iRepetJob.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+        self._iRepetJob.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
+
+        time.sleep(70)
+        
+        expJobStatus = "finished"
+        obsJobStatus1 = self._iRepetJob.getJobStatus(job1)
+        obsJobStatus2 = self._iRepetJob.getJobStatus(job2)
+        obsJobStatus3 = self._iRepetJob.getJobStatus(job3)
+        
+        self.assertEquals(expJobStatus, obsJobStatus1)
+        self.assertEquals(expJobStatus, obsJobStatus2)
+        self.assertEquals(expJobStatus, obsJobStatus3)
+        
+        jobName1 = job1.jobname
+        jobName2 = job2.jobname
+        jobName3 = job3.jobname
+        
+        expErrorFilePrefix1 = jobName1+ ".e" 
+        expOutputFilePrefix1 = jobName1 + ".o"
+        expErrorFilePrefix2 = jobName2 + ".e" 
+        expOutputFilePrefix2 = jobName2 + ".o"
+        expErrorFilePrefix3 = jobName3 + ".e" 
+        expOutputFilePrefix3 = jobName3 + ".o"
+        
+        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
+        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
+        lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
+        lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
+        lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
+        lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
+        
+        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
+        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
+        isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) 
+        isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
+        isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) 
+        isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
+        
+        os.system("rm launcherFileTest*.py *.e* *.o*")
+        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
+        self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
+        self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
+
+    def test_submitJob_job_already_submitted(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance("job")
+        self._iRepetJob.recordJob(iJob)
+        
+        isSysExitRaised = False
+        try:
+            self._iRepetJob.submitJob(iJob)
+        except SystemExit:
+            isSysExitRaised = True
+        self.assertTrue(isSysExitRaised)
+    
+    def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance("job")
+        self._createLauncherFile(iJob)
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.changeJobStatus(iJob, "running", "method")
+        
+        expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % ( iJob.jobid )
+        
+        obsError = "obsError.txt"
+        obsErrorHandler = open(obsError, "w")
+        stderrRef = sys.stderr
+        sys.stderr = obsErrorHandler
+        
+        isSysExitRaised = False
+        try:
+            self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, timeOutPerJob=3)
+        except SystemExit:
+            isSysExitRaised = True
+           
+        obsErrorHandler.close()
+        
+        obsErrorHandler = open(obsError, "r")
+        obsMsg = obsErrorHandler.readline()
+        obsErrorHandler.close()
+       
+        sys.stderr = stderrRef
+        os.remove(obsError)
+        os.system("rm launcherFileTest*.py")
+        self.assertTrue(isSysExitRaised)
+        self.assertEquals(expMsg, obsMsg)
+         
+    def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance("job")
+        self._createLauncherFile(iJob)
+        
+        self._iRepetJob.recordJob(iJob)
+        self._iRepetJob.changeJobStatus(iJob, "error", "method")
+        
+        self._iRepetJob.waitJobGroup(self._jobTableName ,iJob.groupid, 0, 2)
+        
+        time.sleep(10)
+        
+        expJobStatus = "finished"
+        obsJobStatus1 = self._iRepetJob.getJobStatus(iJob)
+        
+        self.assertEquals(expJobStatus, obsJobStatus1)
+        
+        jobName = iJob.jobname
+        
+        expErrorFilePrefix1 = jobName + ".e" 
+        expOutputFilePrefix1 = jobName + ".o"
+        
+        lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
+        lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
+        
+        isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) 
+        isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
+        
+        self._iRepetJob.removeJob(iJob) 
+        os.system("rm launcherFileTest*.py *.e* *.o*")
+        self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
+        
+
+    def test_isJobStillHandledBySge_True(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance("job")
+        self._createLauncherFile(iJob)
+        self._iRepetJob.submitJob(iJob)
+        
+        isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
+        os.system("rm launcherFileTest*.py")
+        
+        self.assertTrue(isJobHandledBySge)
+
+    def test_isJobStillHandledBySge_False(self):
+        self._iRepetJob.createTable(self._jobTableName, "jobs")
+        iJob = self._createJobInstance("job")
+        self._createLauncherFile(iJob)
+        self._iRepetJob.recordJob(iJob)
+        
+        isJobHandledBySge = self._iRepetJob.isJobStillHandledBySge(iJob.jobid, iJob.jobname)
+        os.system("rm launcherFileTest*.py")
+        
+        self.assertFalse(isJobHandledBySge)
+        
+    def _createJobInstance(self, name):
+        return Job(self._jobTableName, 0, name, "test", "", "date;sleep 5;date", "./launcherFileTest_"+ name +".py")
+    
+    def _createLauncherFile(self, iJob):
+        jobFileHandler = open( iJob.launcher , "w" )
+
+        launcher = "#!/usr/bin/python\n"
+        launcher += "import os\n"
+        launcher += "import sys\n"
+        
+        launcher += "print \"system:\", os.uname()\n"
+        launcher += "sys.stdout.flush()\n"
+        newStatus = "running"
+        prg = "%s/bin/srptChangeJobStatus.py" % (os.environ["REPET_PATH"])
+        cmd = prg
+        cmd += " -t %s" % ( iJob.tablename )
+        cmd += " -n %s" % ( iJob.jobname )
+        cmd += " -g %s" % ( iJob.groupid )
+        if iJob.queue != "":
+            cmd += " -q %s" % ( iJob.queue )
+        cmd += " -s %s" % ( newStatus )
+        cmd += " -c %s"  %( self._configFileName )
+        cmd += " -v 1"
+        launcher +="os.system( \"" + cmd + "\" )\n"
+        
+        launcher += "print \"LAUNCH: "+ iJob.command + "\"\n"
+        launcher += "sys.stdout.flush()\n"
+        launcher += "exitStatus = os.system (\"" + iJob.command + "\")\n"
+        launcher += "if exitStatus != 0:\n"
+        launcher += "\tprint \"ERROR: "+  iJob.command + " returned exit status '%i'\" % ( exitStatus )\n"
+        
+        newStatus = "finished"
+        prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
+        cmd = prg
+        cmd += " -t %s" % ( iJob.tablename )
+        cmd += " -n %s" % ( iJob.jobname )
+        cmd += " -g %s" % ( iJob.groupid )
+        if iJob.queue != "":
+            cmd += " -q %s" % ( iJob.queue )
+        cmd += " -s %s" % ( newStatus )
+        cmd += " -c %s"  %( self._configFileName )
+        cmd += " -v 1"
+        launcher +="os.system( \"" + cmd + "\" )\n"
+        launcher += "sys.exit(0)\n"
+        jobFileHandler.write(launcher)
+        jobFileHandler.close()
+        os.chmod( iJob.launcher, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC )
+
+if __name__ == "__main__":
+    unittest.main()
\ No newline at end of file