Mercurial > repos > yufei-luo > s_mart
comparison smart_toolShed/commons/core/sql/test/Test_F_TableJobAdaptator.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e0f8dcca02ed |
---|---|
1 from commons.core.launcher.WriteScript import WriteScript | |
2 from commons.core.sql.Job import Job | |
3 from commons.core.sql.DbFactory import DbFactory | |
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
5 import sys | |
6 import stat | |
7 import os | |
8 import time | |
9 import unittest | |
10 import glob | |
11 | |
12 class Test_F_TableJobAdaptator(unittest.TestCase): | |
13 | |
14 def setUp(self): | |
15 self._jobTableName = "dummyJobTable" | |
16 self._db = DbFactory.createInstance() | |
17 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) | |
18 | |
19 def tearDown(self): | |
20 self._db.dropTable(self._jobTableName) | |
21 self._db.close() | |
22 | |
23 def test_submitJob_with_multiple_jobs(self): | |
24 self._db.createTable(self._jobTableName, "jobs", overwrite = True) | |
25 job1 = _createJobInstance("job1") | |
26 _createLauncherFile(job1, self._iTJA) | |
27 job2 = _createJobInstance("job2") | |
28 _createLauncherFile(job2, self._iTJA) | |
29 job3 = _createJobInstance("job3") | |
30 _createLauncherFile(job3, self._iTJA) | |
31 | |
32 self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
33 self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
34 self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 ) | |
35 | |
36 time.sleep(120) | |
37 | |
38 expJobStatus = "finished" | |
39 obsJobStatus1 = self._iTJA.getJobStatus(job1) | |
40 obsJobStatus2 = self._iTJA.getJobStatus(job2) | |
41 obsJobStatus3 = self._iTJA.getJobStatus(job3) | |
42 | |
43 self.assertEquals(expJobStatus, obsJobStatus1) | |
44 self.assertEquals(expJobStatus, obsJobStatus2) | |
45 self.assertEquals(expJobStatus, obsJobStatus3) | |
46 | |
47 expErrorFilePrefix1 = job1.jobname + ".e" | |
48 expOutputFilePrefix1 = job1.jobname + ".o" | |
49 expErrorFilePrefix2 = job2.jobname + ".e" | |
50 expOutputFilePrefix2 = job2.jobname + ".o" | |
51 expErrorFilePrefix3 = job3.jobname + ".e" | |
52 expOutputFilePrefix3 = job3.jobname + ".o" | |
53 | |
54 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") | |
55 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") | |
56 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*") | |
57 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*") | |
58 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*") | |
59 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*") | |
60 | |
61 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) | |
62 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) | |
63 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0) | |
64 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0) | |
65 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0) | |
66 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0) | |
67 | |
68 os.system("rm launcherFileTest*.py *.e* *.o*") | |
69 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) | |
70 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2) | |
71 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3) | |
72 | |
73 def test_submitJob_job_already_submitted(self): | |
74 self._db.createTable(self._jobTableName, "jobs", overwrite = True) | |
75 iJob = _createJobInstance("job") | |
76 self._iTJA.recordJob(iJob) | |
77 | |
78 isSysExitRaised = False | |
79 try: | |
80 self._iTJA.submitJob(iJob) | |
81 except SystemExit: | |
82 isSysExitRaised = True | |
83 self.assertTrue(isSysExitRaised) | |
84 | |
85 def test_waitJobGroup_with_error_job_maxRelaunch_two(self): | |
86 self._db.createTable(self._jobTableName, "jobs", overwrite = True) | |
87 iJob = _createJobInstance("job") | |
88 _createLauncherFile(iJob, self._iTJA) | |
89 | |
90 self._iTJA.recordJob(iJob) | |
91 self._iTJA.changeJobStatus(iJob, "error") | |
92 | |
93 self._iTJA.waitJobGroup(iJob.groupid, 0, 2) | |
94 | |
95 time.sleep(120) | |
96 | |
97 expJobStatus = "finished" | |
98 obsJobStatus1 = self._iTJA.getJobStatus(iJob) | |
99 | |
100 self.assertEquals(expJobStatus, obsJobStatus1) | |
101 | |
102 expErrorFilePrefix1 = iJob.jobname + ".e" | |
103 expOutputFilePrefix1 = iJob.jobname + ".o" | |
104 | |
105 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*") | |
106 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*") | |
107 | |
108 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0) | |
109 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0) | |
110 | |
111 self._iTJA.removeJob(iJob) | |
112 os.system("rm launcherFileTest*.py *.e* *.o*") | |
113 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1) | |
114 | |
115 class Test_F_TableJobAdaptator_SGE(unittest.TestCase): | |
116 | |
117 def setUp(self): | |
118 if os.environ["REPET_JOB_MANAGER"].lower() != "sge": | |
119 print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"] | |
120 sys.exit(0) | |
121 self._jobTableName = "dummyJobTable" | |
122 self._db = DbFactory.createInstance() | |
123 self._db.createTable(self._jobTableName, "jobs", overwrite = True) | |
124 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName) | |
125 self._iJob = _createJobInstance("job") | |
126 _createLauncherFile(self._iJob, self._iTJA) | |
127 | |
128 def tearDown(self): | |
129 self._db.dropTable(self._jobTableName) | |
130 self._db.close() | |
131 | |
132 def test_waitJobGroup_with_several_nbTimeOut_waiting(self): | |
133 self._iTJA.recordJob(self._iJob) | |
134 self._iTJA.changeJobStatus(self._iJob, "running") | |
135 | |
136 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid | |
137 | |
138 obsError = "obsError.txt" | |
139 obsErrorHandler = open(obsError, "w") | |
140 stderrRef = sys.stderr | |
141 sys.stderr = obsErrorHandler | |
142 | |
143 isSysExitRaised = False | |
144 try: | |
145 self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3) | |
146 except SystemExit: | |
147 isSysExitRaised = True | |
148 | |
149 obsErrorHandler.close() | |
150 | |
151 obsErrorHandler = open(obsError, "r") | |
152 obsMsg = obsErrorHandler.readline() | |
153 obsErrorHandler.close() | |
154 | |
155 sys.stderr = stderrRef | |
156 os.remove(obsError) | |
157 os.system("rm launcherFileTest*.py") | |
158 self.assertTrue(isSysExitRaised) | |
159 self.assertEquals(expMsg, obsMsg) | |
160 | |
161 def test_isJobStillHandledBySge_True(self): | |
162 self._iTJA.submitJob(self._iJob) | |
163 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) | |
164 os.system("rm launcherFileTest*.py") | |
165 self.assertTrue(isJobHandledBySge) | |
166 | |
167 def test_isJobStillHandledBySge_False(self): | |
168 self._iTJA.recordJob(self._iJob) | |
169 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname) | |
170 os.system("rm launcherFileTest*.py") | |
171 self.assertFalse(isJobHandledBySge) | |
172 | |
173 def _createJobInstance(name): | |
174 lResources = [] | |
175 if os.environ.get("HOSTNAME") == "compute-2-46.local": | |
176 lResources.append("test=TRUE") | |
177 return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources) | |
178 | |
179 def _createLauncherFile(iJob, iTJA): | |
180 iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd()) | |
181 iWriteScript.run(iJob.command, "", iJob.launcher) | |
182 os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO) | |
183 | |
184 if __name__ == "__main__": | |
185 unittest.main() |