6
|
1 # Copyright INRA (Institut National de la Recherche Agronomique)
|
|
2 # http://www.inra.fr
|
|
3 # http://urgi.versailles.inra.fr
|
|
4 #
|
|
5 # This software is governed by the CeCILL license under French law and
|
|
6 # abiding by the rules of distribution of free software. You can use,
|
|
7 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
8 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
9 # "http://www.cecill.info".
|
|
10 #
|
|
11 # As a counterpart to the access to the source code and rights to copy,
|
|
12 # modify and redistribute granted by the license, users are provided only
|
|
13 # with a limited warranty and the software's author, the holder of the
|
|
14 # economic rights, and the successive licensors have only limited
|
|
15 # liability.
|
|
16 #
|
|
17 # In this respect, the user's attention is drawn to the risks associated
|
|
18 # with loading, using, modifying and/or developing or reproducing the
|
|
19 # software by the user in light of its specific status of free software,
|
|
20 # that may mean that it is complicated to manipulate, and that also
|
|
21 # therefore means that it is reserved for developers and experienced
|
|
22 # professionals having in-depth computer knowledge. Users are therefore
|
|
23 # encouraged to load and test the software's suitability as regards their
|
|
24 # requirements in conditions enabling the security of their systems and/or
|
|
25 # data to be ensured and, more generally, to use and operate it in the
|
|
26 # same conditions as regards security.
|
|
27 #
|
|
28 # The fact that you are presently reading this means that you have had
|
|
29 # knowledge of the CeCILL license and that you accept its terms.
|
|
30
|
|
31
|
|
32 import os
|
|
33 import time
|
|
34 import sys
|
|
35 import tempfile
|
|
36 import subprocess
|
|
37 from commons.core.sql.Job import Job
|
|
38
|
|
39 ## Methods for Job persistence
|
|
40 #
|
|
41 class JobAdaptator(object):
|
|
42
|
|
43 def __init__(self, lJob = [], table = "" ):
|
|
44 self._lJobID = lJob
|
|
45 self._table = table
|
|
46 self._acronym = ""
|
|
47 ## Record a job
|
|
48 #
|
|
49 # @param job Job instance with the job informations
|
|
50 #
|
|
51 def recordJob(self, job):
|
|
52 self._lJobID.append(job)
|
|
53
|
|
54 ## Remove a job from the job table
|
|
55 #
|
|
56 # @param job: job instance to remove
|
|
57 #
|
|
58 def removeJob(self, job):
|
|
59 pass
|
|
60
|
|
61 ## Set the jobid of a job with the id of SGE
|
|
62 #
|
|
63 # @param job job instance
|
|
64 # @param jobid integer
|
|
65 #
|
|
66 def updateJobIdInDB(self, job, jobid):
|
|
67 pass
|
|
68
|
|
69 ## Get a job status
|
|
70 #
|
|
71 # @param job: a Job instance with the job informations
|
|
72 #
|
|
73 def getJobStatus(self, job):
|
|
74 pass
|
|
75
|
|
76
|
|
77 ## Change a job status
|
|
78 #
|
|
79 # @param job: a Job instance with the job informations
|
|
80 # @param status: the new status (waiting,finished,error)
|
|
81 #
|
|
82 def changeJobStatus(self, job, status):
|
|
83 pass
|
|
84
|
|
85 ## Get the number of jobs belonging to the desired groupid with the desired status.
|
|
86 #
|
|
87 # @param groupid string a group identifier to record related job series
|
|
88 # @param status string job status (waiting, running, finished, error)
|
|
89 # @return int
|
|
90 #
|
|
91 def getCountStatus(self, groupid, status):
|
|
92 pass
|
|
93
|
|
94 ## Clean all job from a job group
|
|
95 #
|
|
96 # @param groupid: a group identifier to record related job series
|
|
97 #
|
|
98 def cleanJobGroup(self, groupid):
|
|
99 pass
|
|
100
|
|
101 ## Check if there is unfinished job from a job group.
|
|
102 #
|
|
103 # @param groupid string a group identifier to record related job series
|
|
104 #
|
|
105 def hasUnfinishedJob(self, groupid):
|
|
106 pass
|
|
107
|
|
108 def _getJobIDListFromQstat(self):
|
|
109 lJobIDFromQstat = []
|
|
110 tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
111 cmd ="qstat | grep %s" % self._acronym
|
|
112 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
|
|
113 process.communicate()
|
|
114 tmp.close()
|
|
115 if process.returncode == 0:
|
|
116 fileName = tmp.name
|
|
117 jobidFileHandler = open(fileName, "r")
|
|
118 for line in jobidFileHandler:
|
|
119 line2 = line.lstrip(" ")
|
|
120 lJobIDFromQstat.append(line2.split(" ")[0])
|
|
121 jobidFileHandler.close()
|
|
122 os.remove(fileName)
|
|
123 return lJobIDFromQstat
|
|
124
|
|
125 def _areJobsStillRunning(self,lJobID,lJobIDFromQstat):
|
|
126 sorted(lJobID)
|
|
127 sorted(lJobIDFromQstat)
|
|
128 for i in lJobID:
|
|
129 for j in lJobIDFromQstat:
|
|
130 if int(i)== int(j):
|
|
131 return True
|
|
132 return False
|
|
133
|
|
134
|
|
135 ## Wait job finished status from a job group.
|
|
136 # Job are re-launched if error (max. 3 times)
|
|
137 #
|
|
138 # @param groupid string a group identifier to record related job series
|
|
139 # @param checkInterval integer time laps in seconds between two checks (default = 5)
|
|
140 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
|
|
141 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
|
|
142 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
|
|
143 #
|
|
144 def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
|
|
145
|
|
146 while True:
|
|
147 time.sleep(checkInterval)
|
|
148 lJobIDFromQstat = self._getJobIDListFromQstat()
|
|
149 if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False:
|
|
150 break
|
|
151
|
|
152 ## Submit a job to a queue and record it in job table.
|
|
153 #
|
|
154 # @param job a job instance
|
|
155 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
|
|
156 # @param checkInterval integer time laps in seconds between two checks (default = 30)
|
|
157 # @param verbose integer (default = 0)
|
|
158 #
|
|
159 def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
|
|
160 cmd = self._getQsubCommand(job)
|
|
161 tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
162 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
|
|
163 process.communicate()
|
|
164 tmp.close()
|
|
165 if process.returncode == 0:
|
|
166 fileName = tmp.name
|
|
167 jobidFileHandler = open(fileName, "r")
|
|
168 jobid = self._getJobidFromJobManager(jobidFileHandler)
|
|
169 if verbose > 0:
|
|
170 print "job '%i %s' submitted" % (jobid, job.jobname)
|
|
171 sys.stdout.flush()
|
|
172 job.jobid = jobid
|
|
173 #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment)
|
|
174 self._acronym = job.jobname.split("_")[0][:10]
|
|
175 self.recordJob(job.jobid)
|
|
176 jobidFileHandler.close()
|
|
177 os.remove(fileName)
|
|
178 return process.returncode
|
|
179
|
|
180
|
|
181 ## Get the list of nodes where jobs of one group were executed
|
|
182 #
|
|
183 # @param groupid string a group identifier of job series
|
|
184 # @return lNodes list of nodes names without redundancy
|
|
185 #
|
|
186 def getNodesListByGroupId(self, groupId):
|
|
187 pass
|
|
188
|
|
189 def checkJobTable(self):
|
|
190 pass
|
|
191
|
|
192 def close(self):
|
|
193 pass
|
|
194
|
|
195 def _getJobidAndNbJob(self, jobid) :
|
|
196 tab = jobid.split(".")
|
|
197 jobid = tab[0]
|
|
198 tab = tab[1].split(":")
|
|
199 nbJob = tab[0]
|
|
200 return jobid, nbJob
|
|
201
|
|
202 class JobAdaptatorSGE(JobAdaptator):
|
|
203
|
|
204 ## Check if a job is still handled by SGE
|
|
205 #
|
|
206 # @param jobid string job identifier
|
|
207 # @param jobname string job name
|
|
208 #
|
|
209 def isJobStillHandledBySge(self, jobid, jobname):
|
|
210 isJobInQstat = False
|
|
211 tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
212 cmd = "qstat"
|
|
213 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
|
|
214 process.communicate()
|
|
215 tmp.close()
|
|
216 qstatFile = tmp.name
|
|
217 if process.returncode != 0:
|
|
218 msg = "ERROR while launching 'qstat'"
|
|
219 sys.stderr.write( "%s\n" % msg )
|
|
220 sys.exit(1)
|
|
221 qstatFileHandler = open(qstatFile, "r")
|
|
222 lLines = qstatFileHandler.readlines()
|
|
223 for line in lLines:
|
|
224 tokens = line.split()
|
|
225 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
|
|
226 isJobInQstat = True
|
|
227 break
|
|
228 qstatFileHandler.close()
|
|
229 os.remove(qstatFile)
|
|
230 return isJobInQstat
|
|
231
|
|
232 def _getQsubCommand(self, job):
|
|
233 cmd = "echo '%s' | " % job.launcher
|
|
234 prg = "qsub"
|
|
235 cmd += prg
|
|
236 cmd += " -V"
|
|
237 cmd += " -N %s" % job.jobname
|
|
238 if job.queue != "":
|
|
239 cmd += " -q %s" % job.queue
|
|
240 cmd += " -cwd"
|
|
241 if job.lResources != []:
|
|
242 cmd += " -l \""
|
|
243 cmd += " ".join(job.lResources)
|
|
244 cmd += "\""
|
|
245 if job.parallelEnvironment != "":
|
|
246 cmd += " -pe " + job.parallelEnvironment
|
|
247 return cmd
|
|
248
|
|
249 def _getJobidFromJobManager(self, jobidFileHandler):
|
|
250 return int(jobidFileHandler.readline().split(" ")[2])
|
|
251
|
|
252
|
|
253 class JobAdaptatorTorque(JobAdaptator):
|
|
254
|
|
255 def _getQsubCommand(self, job):
|
|
256 cmd = "echo '%s' | " % job.launcher
|
|
257 prg = "qsub"
|
|
258 cmd += prg
|
|
259 cmd += " -V"
|
|
260 cmd += " -d %s" % os.getcwd()
|
|
261 cmd += " -N %s" % job.jobname
|
|
262 if job.queue != "":
|
|
263 cmd += " -q %s" % job.queue
|
|
264 if job.lResources != []:
|
|
265 cmd += " -l \""
|
|
266 cmd += " ".join(job.lResources).replace("mem_free","mem")
|
|
267 cmd += "\""
|
|
268 return cmd
|
|
269
|
|
270 def _getJobidFromJobManager(self, jobidFileHandler):
|
|
271 return int(jobidFileHandler.readline().split(".")[0])
|