Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/TableJobAdaptator.py @ 6:769e306b7933
Change the repository level.
| author | yufei-luo |
|---|---|
| date | Fri, 18 Jan 2013 04:54:14 -0500 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 5:ea3082881bf8 | 6:769e306b7933 |
|---|---|
| 1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
| 2 # http://www.inra.fr | |
| 3 # http://urgi.versailles.inra.fr | |
| 4 # | |
| 5 # This software is governed by the CeCILL license under French law and | |
| 6 # abiding by the rules of distribution of free software. You can use, | |
| 7 # modify and/ or redistribute the software under the terms of the CeCILL | |
| 8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
| 9 # "http://www.cecill.info". | |
| 10 # | |
| 11 # As a counterpart to the access to the source code and rights to copy, | |
| 12 # modify and redistribute granted by the license, users are provided only | |
| 13 # with a limited warranty and the software's author, the holder of the | |
| 14 # economic rights, and the successive licensors have only limited | |
| 15 # liability. | |
| 16 # | |
| 17 # In this respect, the user's attention is drawn to the risks associated | |
| 18 # with loading, using, modifying and/or developing or reproducing the | |
| 19 # software by the user in light of its specific status of free software, | |
| 20 # that may mean that it is complicated to manipulate, and that also | |
| 21 # therefore means that it is reserved for developers and experienced | |
| 22 # professionals having in-depth computer knowledge. Users are therefore | |
| 23 # encouraged to load and test the software's suitability as regards their | |
| 24 # requirements in conditions enabling the security of their systems and/or | |
| 25 # data to be ensured and, more generally, to use and operate it in the | |
| 26 # same conditions as regards security. | |
| 27 # | |
| 28 # The fact that you are presently reading this means that you have had | |
| 29 # knowledge of the CeCILL license and that you accept its terms. | |
| 30 | |
| 31 | |
| 32 import os | |
| 33 import time | |
| 34 import datetime | |
| 35 import sys | |
| 36 from commons.core.sql.Job import Job | |
| 37 from commons.core.sql.TableAdaptator import TableAdaptator | |
| 38 | |
| 39 ## Methods for Job persistence | |
| 40 # | |
| 41 class TableJobAdaptator(TableAdaptator): | |
| 42 | |
| 43 ## Record a job | |
| 44 # | |
| 45 # @param job Job instance with the job informations | |
| 46 # | |
| 47 def recordJob(self, job): | |
| 48 self.removeJob(job) | |
| 49 sqlCmd = "INSERT INTO %s" % self._table | |
| 50 sqlCmd += " VALUES (" | |
| 51 sqlCmd += " \"%s\"," % job.jobid | |
| 52 sqlCmd += " \"%s\"," % job.jobname | |
| 53 sqlCmd += " \"%s\"," % job.groupid | |
| 54 sqlCmd += " \"%s\"," % job.launcher | |
| 55 sqlCmd += " \"%s\"," % job.queue | |
| 56 sqlCmd += " \"%s\"," % job.lResources | |
| 57 sqlCmd += " \"waiting\"," | |
| 58 sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") | |
| 59 sqlCmd += " \"?\" );" | |
| 60 self._iDb.execute(sqlCmd) | |
| 61 | |
| 62 | |
| 63 ## Remove a job from the job table | |
| 64 # | |
| 65 # @param job: job instance to remove | |
| 66 # | |
| 67 def removeJob(self, job): | |
| 68 qry = "DELETE FROM %s" % self._table | |
| 69 qry += " WHERE groupid='%s'" % job.groupid | |
| 70 qry += " AND jobname='%s'" % job.jobname | |
| 71 qry += " AND launcher='%s';" % job.launcher | |
| 72 self._iDb.execute(qry) | |
| 73 | |
| 74 | |
| 75 ## Set the jobid of a job with the id of SGE | |
| 76 # | |
| 77 # @param job job instance | |
| 78 # @param jobid integer | |
| 79 # | |
| 80 def updateJobIdInDB(self, job, jobid): | |
| 81 #TODO: check if only one job will be updated | |
| 82 qry = "UPDATE %s" % self._table | |
| 83 qry += " SET jobid='%i'" % int(jobid) | |
| 84 qry += " WHERE jobname='%s'" % job.jobname | |
| 85 qry += " AND groupid='%s'" % job.groupid | |
| 86 qry += " AND launcher='%s';" % job.launcher | |
| 87 self._iDb.execute(qry) | |
| 88 | |
| 89 | |
| 90 ## Get a job status | |
| 91 # | |
| 92 # @param job: a Job instance with the job informations | |
| 93 # | |
| 94 def getJobStatus(self, job): | |
| 95 if job.jobid != 0 and job.jobname == "": | |
| 96 job.jobname = job.jobid | |
| 97 job.jobid = 0 | |
| 98 qry = "SELECT status FROM %s" % self._table | |
| 99 qry += " WHERE groupid='%s'" % job.groupid | |
| 100 qry += " AND jobname='%s'" % job.jobname | |
| 101 qry += " AND launcher='%s';" % job.launcher | |
| 102 self._iDb.execute(qry) | |
| 103 res = self._iDb.fetchall() | |
| 104 if len(res) > 1: | |
| 105 sys.stderr.write("ERROR while getting job status: non-unique jobs\n") | |
| 106 sys.stderr.flush() | |
| 107 sys.exit(1) | |
| 108 if res == None or len(res) == 0: | |
| 109 return "unknown" | |
| 110 return res[0][0] | |
| 111 | |
| 112 | |
| 113 ## Change a job status | |
| 114 # | |
| 115 # @param job: a Job instance with the job informations | |
| 116 # @param status: the new status (waiting,finished,error) | |
| 117 # | |
| 118 def changeJobStatus(self, job, status): | |
| 119 sqlCmd = "UPDATE %s" % self._table | |
| 120 sqlCmd += " SET status='%s'" % status | |
| 121 sqlCmd += ", node='%s'" % job.node | |
| 122 sqlCmd += " WHERE groupid='%s'" % job.groupid | |
| 123 sqlCmd += " AND jobname='%s'" % job.jobname | |
| 124 sqlCmd += " AND launcher='%s';" % job.launcher | |
| 125 self._iDb.execute(sqlCmd) | |
| 126 | |
| 127 | |
| 128 ## Get the number of jobs belonging to the desired groupid with the desired status. | |
| 129 # | |
| 130 # @param groupid string a group identifier to record related job series | |
| 131 # @param status string job status (waiting, running, finished, error) | |
| 132 # @return int | |
| 133 # | |
| 134 def getCountStatus(self, groupid, status): | |
| 135 qry = "SELECT count(jobname) FROM %s" % self._table | |
| 136 qry += " WHERE groupid='%s'" % groupid | |
| 137 qry += " AND status='%s';" % status | |
| 138 self._iDb.execute(qry) | |
| 139 res = self._iDb.fetchall() | |
| 140 return int(res[0][0]) | |
| 141 | |
| 142 | |
| 143 ## Clean all job from a job group | |
| 144 # | |
| 145 # @param groupid: a group identifier to record related job series | |
| 146 # | |
| 147 def cleanJobGroup(self, groupid): | |
| 148 qry = "DELETE FROM %s WHERE groupid='%s';" % (self._table, groupid) | |
| 149 self._iDb.execute(qry) | |
| 150 | |
| 151 | |
| 152 ## Check if there is unfinished job from a job group. | |
| 153 # | |
| 154 # @param groupid string a group identifier to record related job series | |
| 155 # | |
| 156 def hasUnfinishedJob(self, groupid): | |
| 157 qry = "SELECT * FROM %s" % self._table | |
| 158 qry += " WHERE groupid='%s'" % groupid | |
| 159 qry += " and status!='finished';" | |
| 160 self._iDb.execute(qry) | |
| 161 res = self._iDb.fetchall() | |
| 162 if len(res) == 0: | |
| 163 return False | |
| 164 return True | |
| 165 | |
| 166 | |
| 167 ## Wait job finished status from a job group. | |
| 168 # Job are re-launched if error (max. 3 times) | |
| 169 # | |
| 170 # @param groupid string a group identifier to record related job series | |
| 171 # @param checkInterval integer time laps in seconds between two checks (default = 5) | |
| 172 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) | |
| 173 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) | |
| 174 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) | |
| 175 # | |
| 176 def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): | |
| 177 dJob2Err = {} | |
| 178 | |
| 179 # retrieve the total number of jobs belonging to the desired groupid | |
| 180 qry = "SELECT count(jobname) FROM %s WHERE groupid='%s';" % (self._table, groupid) | |
| 181 self._iDb.execute(qry) | |
| 182 totalNbJobs = int(self._iDb.fetchall()[0][0]) | |
| 183 | |
| 184 nbTimeOuts = 0 | |
| 185 | |
| 186 while True: | |
| 187 time.sleep(checkInterval) | |
| 188 # retrieve the finished jobs and stop if all jobs are finished | |
| 189 nbFinishedJobs = self.getCountStatus(groupid, "finished") | |
| 190 if nbFinishedJobs == totalNbJobs: | |
| 191 break | |
| 192 | |
| 193 # retrieve the jobs in error and relaunch them if they are in error (max. 'maxRelaunch' times) | |
| 194 qry = "SELECT * FROM %s" % self._table | |
| 195 qry += " WHERE groupid='%s'" % groupid | |
| 196 qry += " AND status ='error';" | |
| 197 self._iDb.execute(qry) | |
| 198 lJobsInError = self._iDb.fetchall() | |
| 199 for job in lJobsInError: | |
| 200 jobName = job[1] | |
| 201 if not dJob2Err.has_key(jobName): | |
| 202 dJob2Err[jobName] = 1 | |
| 203 if dJob2Err[jobName] < maxRelaunch: | |
| 204 print "job '%s' in error, re-submitting (%i)" % (job[1], dJob2Err[job[1]]) | |
| 205 sys.stdout.flush() | |
| 206 lResources = job[5].replace("[", "").replace("]", "").replace("'", "").split(", ") | |
| 207 newJob = Job(jobname=jobName, groupid=job[2], launcherFile=job[3], queue=job[4], lResources=lResources) | |
| 208 self.submitJob(newJob) | |
| 209 dJob2Err[jobName] += 1 | |
| 210 else: | |
| 211 dJob2Err[jobName] += 1 | |
| 212 cmd = "job '%s' in permanent error (>%i)" % (jobName, maxRelaunch) | |
| 213 cmd += "\ngroupid = %s" % groupid | |
| 214 cmd += "\nnb of jobs = %i" % totalNbJobs | |
| 215 cmd += "\nnb of finished jobs = %i" % self.getCountStatus(groupid, "finished") | |
| 216 cmd += "\nnb of waiting jobs = %i" % self.getCountStatus(groupid, "waiting") | |
| 217 cmd += "\nnb of running jobs = %i" % self.getCountStatus(groupid, "running") | |
| 218 cmd += "\nnb of jobs in error = %i" % self.getCountStatus(groupid, "error") | |
| 219 sys.stdout.flush() | |
| 220 if exitIfTooManyErrors: | |
| 221 self.cleanJobGroup(groupid) | |
| 222 sys.exit(1) | |
| 223 else: | |
| 224 checkInterval = 60 | |
| 225 nbTimeOuts = self._checkIfJobsTableAndJobsManagerInfoAreConsistent(nbTimeOuts, timeOutPerJob, groupid) | |
| 226 | |
| 227 | |
| 228 ## Submit a job to a queue and record it in job table. | |
| 229 # | |
| 230 # @param job a job instance | |
| 231 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) | |
| 232 # @param checkInterval integer time laps in seconds between two checks (default = 30) | |
| 233 # @param verbose integer (default = 0) | |
| 234 # | |
| 235 def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30): | |
| 236 if self.getJobStatus(job) in ["waiting", "running", "finished"]: | |
| 237 sys.stderr.write( "WARNING: job '%s' was already submitted\n" % job.jobname) | |
| 238 sys.stderr.flush() | |
| 239 self.cleanJobGroup(job.groupid) | |
| 240 sys.exit(1) | |
| 241 | |
| 242 while self.getCountStatus(job.groupid, "waiting") > maxNbWaitingJobs: | |
| 243 time.sleep(checkInterval) | |
| 244 | |
| 245 self.recordJob(job) | |
| 246 cmd = self._getQsubCommand(job) | |
| 247 returnStatus = os.system(cmd) | |
| 248 | |
| 249 if returnStatus == 0: | |
| 250 fileName = "jobid.stdout" | |
| 251 jobidFileHandler = open(fileName, "r") | |
| 252 jobid = self._getJobidFromJobManager(jobidFileHandler) | |
| 253 if verbose > 0: | |
| 254 print "job '%i %s' submitted" % (jobid, job.jobname) | |
| 255 sys.stdout.flush() | |
| 256 job.jobid = jobid | |
| 257 jobidFileHandler.close() | |
| 258 self.updateJobIdInDB(job, jobid) | |
| 259 os.remove(fileName) | |
| 260 return returnStatus | |
| 261 | |
| 262 | |
| 263 ## Get the list of nodes where jobs of one group were executed | |
| 264 # | |
| 265 # @param groupid string a group identifier of job series | |
| 266 # @return lNodes list of nodes names without redundancy | |
| 267 # | |
| 268 def getNodesListByGroupId(self, groupId): | |
| 269 qry = "SELECT DISTINCT node FROM %s" % self._table | |
| 270 qry += " WHERE groupid='%s'" % groupId | |
| 271 self._iDb.execute(qry) | |
| 272 res = self._iDb.fetchall() | |
| 273 lNodes = [] | |
| 274 for resTuple in res: | |
| 275 lNodes.append(resTuple[0]) | |
| 276 return lNodes | |
| 277 | |
| 278 def checkJobTable(self): | |
| 279 if not self._iDb.doesTableExist(self._table): | |
| 280 self._iDb.createTable(self._table, "jobs") | |
| 281 else: | |
| 282 lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"]) | |
| 283 lObsFields = sorted(self._iDb.getFieldList(self._table)) | |
| 284 if lExpFields != lObsFields: | |
| 285 self._iDb.createTable(self._table, "jobs", overwrite = True) | |
| 286 | |
| 287 def close(self): | |
| 288 self._iDb.close() | |
| 289 | |
| 290 def _getJobidAndNbJob(self, jobid) : | |
| 291 tab = jobid.split(".") | |
| 292 jobid = tab[0] | |
| 293 tab = tab[1].split(":") | |
| 294 nbJob = tab[0] | |
| 295 return jobid, nbJob | |
| 296 | |
| 297 class TableJobAdaptatorSGE(TableJobAdaptator): | |
| 298 | |
| 299 def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid): | |
| 300 # retrieve the date and time at which the oldest, still-running job was submitted | |
| 301 sql = "SELECT jobid,jobname,time FROM %s WHERE groupid='%s' AND status='running' ORDER BY time DESC LIMIT 1" % (self._table, groupid) | |
| 302 self._iDb.execute( sql ) | |
| 303 res = self._iDb.fetchall() | |
| 304 if len(res) > 0: | |
| 305 jobid = res[0][0] | |
| 306 jobname = res[0][1] | |
| 307 dateTimeOldestJob = res[0][2] | |
| 308 dateTimeCurrent = datetime.datetime.now() | |
| 309 # delta is time between (i) first job launched of the given groupid and still in running state and (ii) current time | |
| 310 delta = dateTimeCurrent - dateTimeOldestJob | |
| 311 # check if delta is in an interval: 0 <= delta < 1h | 1h <= delta < 2h | 2h <= delta < 3h (timeOutPerJob = 1h) | |
| 312 if delta.seconds >= nbTimeOuts * timeOutPerJob and delta.seconds < (nbTimeOuts+1) * timeOutPerJob: | |
| 313 return nbTimeOuts | |
| 314 # delta outside the interval: go to next interval (time out) | |
| 315 if delta.seconds >= (nbTimeOuts+1) * timeOutPerJob: | |
| 316 nbTimeOuts += 1 | |
| 317 # Job with 'running' status should be in qstat. Because status in DB is set at 'running' by the job launched. | |
| 318 if not self.isJobStillHandledBySge(jobid, jobname): | |
| 319 # But if not, let time for the status update (in DB), if the job finished between the query execution and now. | |
| 320 time.sleep( 5 ) | |
| 321 # If no update at 'finished', exit | |
| 322 #TODO: check status in DB | |
| 323 if not self.isJobStillHandledBySge(jobid, jobname): | |
| 324 msg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore" % ( jobid ) | |
| 325 msg += "\nit was launched the %s (> %.2f hours ago)" % ( dateTimeOldestJob, timeOutPerJob/3600.0 ) | |
| 326 msg += "\nthis problem can be due to:" | |
| 327 msg += "\n* memory shortage, in that case, decrease the size of your jobs;" | |
| 328 msg += "\n* timeout, in that case, decrease the size of your jobs;" | |
| 329 msg += "\n* node failure or database error, in that case, launch the program again or ask your system administrator." | |
| 330 sys.stderr.write("%s\n" % msg) | |
| 331 sys.stderr.flush() | |
| 332 self.cleanJobGroup(groupid) | |
| 333 sys.exit(1) | |
| 334 return nbTimeOuts | |
| 335 | |
| 336 ## Check if a job is still handled by SGE | |
| 337 # | |
| 338 # @param jobid string job identifier | |
| 339 # @param jobname string job name | |
| 340 # | |
| 341 def isJobStillHandledBySge(self, jobid, jobname): | |
| 342 isJobInQstat = False | |
| 343 qstatFile = "qstat_stdout" | |
| 344 cmd = "qstat > %s" % qstatFile | |
| 345 returnStatus = os.system(cmd) | |
| 346 if returnStatus != 0: | |
| 347 msg = "ERROR while launching 'qstat'" | |
| 348 sys.stderr.write( "%s\n" % msg ) | |
| 349 sys.exit(1) | |
| 350 qstatFileHandler = open(qstatFile, "r") | |
| 351 lLines = qstatFileHandler.readlines() | |
| 352 for line in lLines: | |
| 353 tokens = line.split() | |
| 354 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: | |
| 355 isJobInQstat = True | |
| 356 break | |
| 357 qstatFileHandler.close() | |
| 358 os.remove(qstatFile) | |
| 359 return isJobInQstat | |
| 360 | |
| 361 def _getQsubCommand(self, job): | |
| 362 cmd = "echo '%s' | " % job.launcher | |
| 363 prg = "qsub" | |
| 364 cmd += prg | |
| 365 cmd += " -V" | |
| 366 cmd += " -N %s" % job.jobname | |
| 367 if job.queue != "": | |
| 368 cmd += " -q %s" % job.queue | |
| 369 cmd += " -cwd" | |
| 370 if job.lResources != []: | |
| 371 cmd += " -l \"" | |
| 372 cmd += " ".join(job.lResources) | |
| 373 cmd += "\"" | |
| 374 if job.parallelEnvironment != "": | |
| 375 cmd += " -pe " + job.parallelEnvironment | |
| 376 cmd += " > jobid.stdout" | |
| 377 return cmd | |
| 378 | |
| 379 def _getJobidFromJobManager(self, jobidFileHandler): | |
| 380 return int(jobidFileHandler.readline().split(" ")[2]) | |
| 381 | |
| 382 | |
| 383 class TableJobAdaptatorTorque(TableJobAdaptator): | |
| 384 | |
| 385 def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid): | |
| 386 return nbTimeOuts | |
| 387 | |
| 388 def _getQsubCommand(self, job): | |
| 389 cmd = "echo '%s' | " % job.launcher | |
| 390 prg = "qsub" | |
| 391 cmd += prg | |
| 392 cmd += " -V" | |
| 393 cmd += " -d %s" % os.getcwd() | |
| 394 cmd += " -N %s" % job.jobname | |
| 395 if job.queue != "": | |
| 396 cmd += " -q %s" % job.queue | |
| 397 if job.lResources != []: | |
| 398 cmd += " -l \"" | |
| 399 cmd += " ".join(job.lResources).replace("mem_free","mem") | |
| 400 cmd += "\"" | |
| 401 cmd += " > jobid.stdout" | |
| 402 return cmd | |
| 403 | |
| 404 def _getJobidFromJobManager(self, jobidFileHandler): | |
| 405 return int(jobidFileHandler.readline().split(".")[0]) |
