comparison commons/core/sql/JobAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import os
33 import time
34 import sys
35 import tempfile
36 import subprocess
37 from commons.core.sql.Job import Job
38
39 ## Methods for Job persistence
40 #
41 class JobAdaptator(object):
42
43 def __init__(self, lJob = [], table = "" ):
44 self._lJobID = lJob
45 self._table = table
46 self._acronym = ""
47 ## Record a job
48 #
49 # @param job Job instance with the job informations
50 #
51 def recordJob(self, job):
52 self._lJobID.append(job)
53
54 ## Remove a job from the job table
55 #
56 # @param job: job instance to remove
57 #
58 def removeJob(self, job):
59 pass
60
61 ## Set the jobid of a job with the id of SGE
62 #
63 # @param job job instance
64 # @param jobid integer
65 #
66 def updateJobIdInDB(self, job, jobid):
67 pass
68
69 ## Get a job status
70 #
71 # @param job: a Job instance with the job informations
72 #
73 def getJobStatus(self, job):
74 pass
75
76
77 ## Change a job status
78 #
79 # @param job: a Job instance with the job informations
80 # @param status: the new status (waiting,finished,error)
81 #
82 def changeJobStatus(self, job, status):
83 pass
84
85 ## Get the number of jobs belonging to the desired groupid with the desired status.
86 #
87 # @param groupid string a group identifier to record related job series
88 # @param status string job status (waiting, running, finished, error)
89 # @return int
90 #
91 def getCountStatus(self, groupid, status):
92 pass
93
94 ## Clean all job from a job group
95 #
96 # @param groupid: a group identifier to record related job series
97 #
98 def cleanJobGroup(self, groupid):
99 pass
100
101 ## Check if there is unfinished job from a job group.
102 #
103 # @param groupid string a group identifier to record related job series
104 #
105 def hasUnfinishedJob(self, groupid):
106 pass
107
108 def _getJobIDListFromQstat(self):
109 lJobIDFromQstat = []
110 tmp = tempfile.NamedTemporaryFile(delete=False)
111 cmd ="qstat | grep %s" % self._acronym
112 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
113 process.communicate()
114 tmp.close()
115 if process.returncode == 0:
116 fileName = tmp.name
117 jobidFileHandler = open(fileName, "r")
118 for line in jobidFileHandler:
119 line2 = line.lstrip(" ")
120 lJobIDFromQstat.append(line2.split(" ")[0])
121 jobidFileHandler.close()
122 os.remove(fileName)
123 return lJobIDFromQstat
124
125 def _areJobsStillRunning(self,lJobID,lJobIDFromQstat):
126 sorted(lJobID)
127 sorted(lJobIDFromQstat)
128 for i in lJobID:
129 for j in lJobIDFromQstat:
130 if int(i)== int(j):
131 return True
132 return False
133
134
135 ## Wait job finished status from a job group.
136 # Job are re-launched if error (max. 3 times)
137 #
138 # @param groupid string a group identifier to record related job series
139 # @param checkInterval integer time laps in seconds between two checks (default = 5)
140 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
141 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
142 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
143 #
144 def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
145
146 while True:
147 time.sleep(checkInterval)
148 lJobIDFromQstat = self._getJobIDListFromQstat()
149 if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False:
150 break
151
152 ## Submit a job to a queue and record it in job table.
153 #
154 # @param job a job instance
155 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
156 # @param checkInterval integer time laps in seconds between two checks (default = 30)
157 # @param verbose integer (default = 0)
158 #
159 def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
160 cmd = self._getQsubCommand(job)
161 tmp = tempfile.NamedTemporaryFile(delete=False)
162 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
163 process.communicate()
164 tmp.close()
165 if process.returncode == 0:
166 fileName = tmp.name
167 jobidFileHandler = open(fileName, "r")
168 jobid = self._getJobidFromJobManager(jobidFileHandler)
169 if verbose > 0:
170 print "job '%i %s' submitted" % (jobid, job.jobname)
171 sys.stdout.flush()
172 job.jobid = jobid
173 #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment)
174 self._acronym = job.jobname.split("_")[0][:10]
175 self.recordJob(job.jobid)
176 jobidFileHandler.close()
177 os.remove(fileName)
178 return process.returncode
179
180
181 ## Get the list of nodes where jobs of one group were executed
182 #
183 # @param groupid string a group identifier of job series
184 # @return lNodes list of nodes names without redundancy
185 #
186 def getNodesListByGroupId(self, groupId):
187 pass
188
189 def checkJobTable(self):
190 pass
191
192 def close(self):
193 pass
194
195 def _getJobidAndNbJob(self, jobid) :
196 tab = jobid.split(".")
197 jobid = tab[0]
198 tab = tab[1].split(":")
199 nbJob = tab[0]
200 return jobid, nbJob
201
202 class JobAdaptatorSGE(JobAdaptator):
203
204 ## Check if a job is still handled by SGE
205 #
206 # @param jobid string job identifier
207 # @param jobname string job name
208 #
209 def isJobStillHandledBySge(self, jobid, jobname):
210 isJobInQstat = False
211 tmp = tempfile.NamedTemporaryFile(delete=False)
212 cmd = "qstat"
213 process = subprocess.Popen(cmd, shell=True,stdout=tmp)
214 process.communicate()
215 tmp.close()
216 qstatFile = tmp.name
217 if process.returncode != 0:
218 msg = "ERROR while launching 'qstat'"
219 sys.stderr.write( "%s\n" % msg )
220 sys.exit(1)
221 qstatFileHandler = open(qstatFile, "r")
222 lLines = qstatFileHandler.readlines()
223 for line in lLines:
224 tokens = line.split()
225 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
226 isJobInQstat = True
227 break
228 qstatFileHandler.close()
229 os.remove(qstatFile)
230 return isJobInQstat
231
232 def _getQsubCommand(self, job):
233 cmd = "echo '%s' | " % job.launcher
234 prg = "qsub"
235 cmd += prg
236 cmd += " -V"
237 cmd += " -N %s" % job.jobname
238 if job.queue != "":
239 cmd += " -q %s" % job.queue
240 cmd += " -cwd"
241 if job.lResources != []:
242 cmd += " -l \""
243 cmd += " ".join(job.lResources)
244 cmd += "\""
245 if job.parallelEnvironment != "":
246 cmd += " -pe " + job.parallelEnvironment
247 return cmd
248
249 def _getJobidFromJobManager(self, jobidFileHandler):
250 return int(jobidFileHandler.readline().split(" ")[2])
251
252
253 class JobAdaptatorTorque(JobAdaptator):
254
255 def _getQsubCommand(self, job):
256 cmd = "echo '%s' | " % job.launcher
257 prg = "qsub"
258 cmd += prg
259 cmd += " -V"
260 cmd += " -d %s" % os.getcwd()
261 cmd += " -N %s" % job.jobname
262 if job.queue != "":
263 cmd += " -q %s" % job.queue
264 if job.lResources != []:
265 cmd += " -l \""
266 cmd += " ".join(job.lResources).replace("mem_free","mem")
267 cmd += "\""
268 return cmd
269
270 def _getJobidFromJobManager(self, jobidFileHandler):
271 return int(jobidFileHandler.readline().split(".")[0])