6
|
1 from commons.core.launcher.WriteScript import WriteScript
|
|
2 from commons.core.sql.Job import Job
|
|
3 from commons.core.sql.DbFactory import DbFactory
|
|
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
5 import sys
|
|
6 import stat
|
|
7 import os
|
|
8 import time
|
|
9 import unittest
|
|
10 import glob
|
|
11
|
|
12 class Test_F_TableJobAdaptator(unittest.TestCase):
|
|
13
|
|
14 def setUp(self):
|
|
15 self._jobTableName = "dummyJobTable"
|
|
16 self._db = DbFactory.createInstance()
|
|
17 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
|
|
18
|
|
19 def tearDown(self):
|
|
20 self._db.dropTable(self._jobTableName)
|
|
21 self._db.close()
|
|
22
|
|
23 def test_submitJob_with_multiple_jobs(self):
|
|
24 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
|
|
25 job1 = _createJobInstance("job1")
|
|
26 _createLauncherFile(job1, self._iTJA)
|
|
27 job2 = _createJobInstance("job2")
|
|
28 _createLauncherFile(job2, self._iTJA)
|
|
29 job3 = _createJobInstance("job3")
|
|
30 _createLauncherFile(job3, self._iTJA)
|
|
31
|
|
32 self._iTJA.submitJob( job1, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
33 self._iTJA.submitJob( job2, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
34 self._iTJA.submitJob( job3, maxNbWaitingJobs=3, checkInterval=5, verbose=0 )
|
|
35
|
|
36 time.sleep(120)
|
|
37
|
|
38 expJobStatus = "finished"
|
|
39 obsJobStatus1 = self._iTJA.getJobStatus(job1)
|
|
40 obsJobStatus2 = self._iTJA.getJobStatus(job2)
|
|
41 obsJobStatus3 = self._iTJA.getJobStatus(job3)
|
|
42
|
|
43 self.assertEquals(expJobStatus, obsJobStatus1)
|
|
44 self.assertEquals(expJobStatus, obsJobStatus2)
|
|
45 self.assertEquals(expJobStatus, obsJobStatus3)
|
|
46
|
|
47 expErrorFilePrefix1 = job1.jobname + ".e"
|
|
48 expOutputFilePrefix1 = job1.jobname + ".o"
|
|
49 expErrorFilePrefix2 = job2.jobname + ".e"
|
|
50 expOutputFilePrefix2 = job2.jobname + ".o"
|
|
51 expErrorFilePrefix3 = job3.jobname + ".e"
|
|
52 expOutputFilePrefix3 = job3.jobname + ".o"
|
|
53
|
|
54 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
|
|
55 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
|
|
56 lErrorFiles2 = glob.glob(expErrorFilePrefix2 + "*")
|
|
57 lOutputFiles2 = glob.glob(expOutputFilePrefix2 + "*")
|
|
58 lErrorFiles3 = glob.glob(expErrorFilePrefix3 + "*")
|
|
59 lOutputFiles3 = glob.glob(expOutputFilePrefix3 + "*")
|
|
60
|
|
61 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
|
|
62 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
|
|
63 isLErrorFileNotEmpty2 = (len(lErrorFiles2) != 0)
|
|
64 isLOutputFileNotEmpty2 = (len(lOutputFiles2) != 0)
|
|
65 isLErrorFileNotEmpty3 = (len(lErrorFiles3) != 0)
|
|
66 isLOutputFileNotEmpty3 = (len(lOutputFiles3) != 0)
|
|
67
|
|
68 os.system("rm launcherFileTest*.py *.e* *.o*")
|
|
69 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
|
|
70 self.assertTrue(isLErrorFileNotEmpty2 and isLOutputFileNotEmpty2)
|
|
71 self.assertTrue(isLErrorFileNotEmpty3 and isLOutputFileNotEmpty3)
|
|
72
|
|
73 def test_submitJob_job_already_submitted(self):
|
|
74 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
|
|
75 iJob = _createJobInstance("job")
|
|
76 self._iTJA.recordJob(iJob)
|
|
77
|
|
78 isSysExitRaised = False
|
|
79 try:
|
|
80 self._iTJA.submitJob(iJob)
|
|
81 except SystemExit:
|
|
82 isSysExitRaised = True
|
|
83 self.assertTrue(isSysExitRaised)
|
|
84
|
|
85 def test_waitJobGroup_with_error_job_maxRelaunch_two(self):
|
|
86 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
|
|
87 iJob = _createJobInstance("job")
|
|
88 _createLauncherFile(iJob, self._iTJA)
|
|
89
|
|
90 self._iTJA.recordJob(iJob)
|
|
91 self._iTJA.changeJobStatus(iJob, "error")
|
|
92
|
|
93 self._iTJA.waitJobGroup(iJob.groupid, 0, 2)
|
|
94
|
|
95 time.sleep(120)
|
|
96
|
|
97 expJobStatus = "finished"
|
|
98 obsJobStatus1 = self._iTJA.getJobStatus(iJob)
|
|
99
|
|
100 self.assertEquals(expJobStatus, obsJobStatus1)
|
|
101
|
|
102 expErrorFilePrefix1 = iJob.jobname + ".e"
|
|
103 expOutputFilePrefix1 = iJob.jobname + ".o"
|
|
104
|
|
105 lErrorFiles1 = glob.glob(expErrorFilePrefix1 + "*")
|
|
106 lOutputFiles1 = glob.glob(expOutputFilePrefix1 + "*")
|
|
107
|
|
108 isLErrorFileNotEmpty1 = (len(lErrorFiles1) != 0)
|
|
109 isLOutputFileNotEmpty1 = (len(lOutputFiles1) != 0)
|
|
110
|
|
111 self._iTJA.removeJob(iJob)
|
|
112 os.system("rm launcherFileTest*.py *.e* *.o*")
|
|
113 self.assertTrue(isLErrorFileNotEmpty1 and isLOutputFileNotEmpty1)
|
|
114
|
|
115 class Test_F_TableJobAdaptator_SGE(unittest.TestCase):
|
|
116
|
|
117 def setUp(self):
|
|
118 if os.environ["REPET_JOB_MANAGER"].lower() != "sge":
|
|
119 print "ERROR: jobs manager is not SGE: REPET_JOB_MANAGER = %s." % os.environ["REPET_JOB_MANAGER"]
|
|
120 sys.exit(0)
|
|
121 self._jobTableName = "dummyJobTable"
|
|
122 self._db = DbFactory.createInstance()
|
|
123 self._db.createTable(self._jobTableName, "jobs", overwrite = True)
|
|
124 self._iTJA = TableJobAdaptatorFactory.createInstance(self._db, self._jobTableName)
|
|
125 self._iJob = _createJobInstance("job")
|
|
126 _createLauncherFile(self._iJob, self._iTJA)
|
|
127
|
|
128 def tearDown(self):
|
|
129 self._db.dropTable(self._jobTableName)
|
|
130 self._db.close()
|
|
131
|
|
132 def test_waitJobGroup_with_several_nbTimeOut_waiting(self):
|
|
133 self._iTJA.recordJob(self._iJob)
|
|
134 self._iTJA.changeJobStatus(self._iJob, "running")
|
|
135
|
|
136 expMsg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore\n" % self._iJob.jobid
|
|
137
|
|
138 obsError = "obsError.txt"
|
|
139 obsErrorHandler = open(obsError, "w")
|
|
140 stderrRef = sys.stderr
|
|
141 sys.stderr = obsErrorHandler
|
|
142
|
|
143 isSysExitRaised = False
|
|
144 try:
|
|
145 self._iTJA.waitJobGroup(self._iJob.groupid, timeOutPerJob = 3)
|
|
146 except SystemExit:
|
|
147 isSysExitRaised = True
|
|
148
|
|
149 obsErrorHandler.close()
|
|
150
|
|
151 obsErrorHandler = open(obsError, "r")
|
|
152 obsMsg = obsErrorHandler.readline()
|
|
153 obsErrorHandler.close()
|
|
154
|
|
155 sys.stderr = stderrRef
|
|
156 os.remove(obsError)
|
|
157 os.system("rm launcherFileTest*.py")
|
|
158 self.assertTrue(isSysExitRaised)
|
|
159 self.assertEquals(expMsg, obsMsg)
|
|
160
|
|
161 def test_isJobStillHandledBySge_True(self):
|
|
162 self._iTJA.submitJob(self._iJob)
|
|
163 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
|
|
164 os.system("rm launcherFileTest*.py")
|
|
165 self.assertTrue(isJobHandledBySge)
|
|
166
|
|
167 def test_isJobStillHandledBySge_False(self):
|
|
168 self._iTJA.recordJob(self._iJob)
|
|
169 isJobHandledBySge = self._iTJA.isJobStillHandledBySge(self._iJob.jobid, self._iJob.jobname)
|
|
170 os.system("rm launcherFileTest*.py")
|
|
171 self.assertFalse(isJobHandledBySge)
|
|
172
|
|
173 def _createJobInstance(name):
|
|
174 lResources = []
|
|
175 if os.environ.get("HOSTNAME") == "compute-2-46.local":
|
|
176 lResources.append("test=TRUE")
|
|
177 return Job(0, name, "test", "", "log = os.system(\"date;sleep 5;date\")", "%s/launcherFileTest_%s.py" % (os.getcwd(), name), lResources=lResources)
|
|
178
|
|
179 def _createLauncherFile(iJob, iTJA):
|
|
180 iWriteScript = WriteScript(iJob, iTJA, os.getcwd(), os.getcwd())
|
|
181 iWriteScript.run(iJob.command, "", iJob.launcher)
|
|
182 os.chmod(iJob.launcher, stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO)
|
|
183
|
|
184 if __name__ == "__main__":
|
|
185 unittest.main()
|