Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/TableJobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/TableJobAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,405 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import os +import time +import datetime +import sys +from commons.core.sql.Job import Job +from commons.core.sql.TableAdaptator import TableAdaptator + +## Methods for Job persistence +# +class TableJobAdaptator(TableAdaptator): + + ## Record a job + # + # @param job Job instance with the job informations + # + def recordJob(self, job): + self.removeJob(job) + sqlCmd = "INSERT INTO %s" % self._table + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % job.jobid + sqlCmd += " \"%s\"," % job.jobname + sqlCmd += " \"%s\"," % job.groupid + sqlCmd += " \"%s\"," % job.launcher + sqlCmd += " \"%s\"," % job.queue + sqlCmd += " \"%s\"," % job.lResources + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % time.strftime("%Y-%m-%d %H:%M:%S") + sqlCmd += " \"?\" );" + self._iDb.execute(sqlCmd) + + + ## Remove a job from the job table + # + # @param job: job instance to remove + # + def removeJob(self, job): + qry = "DELETE FROM %s" % self._table + qry += " WHERE groupid='%s'" % job.groupid + qry += " AND jobname='%s'" % job.jobname + qry += " AND launcher='%s';" % job.launcher + self._iDb.execute(qry) + + + ## Set the jobid of a job with the id of SGE + # + # @param job job instance + # @param jobid integer + # + def updateJobIdInDB(self, job, jobid): + #TODO: check if only one job will be updated + qry = "UPDATE %s" % self._table + qry += " SET jobid='%i'" % int(jobid) + qry += " WHERE jobname='%s'" % job.jobname + qry += " AND groupid='%s'" % job.groupid + qry += " AND launcher='%s';" % job.launcher + self._iDb.execute(qry) + + + ## Get a job status + # + # @param job: a Job instance with the job informations + # + def getJobStatus(self, job): + if job.jobid != 0 and job.jobname == "": + job.jobname = job.jobid + job.jobid = 0 + qry = "SELECT status FROM %s" % self._table + qry += " WHERE groupid='%s'" % job.groupid + qry += " AND jobname='%s'" % job.jobname + qry += " AND launcher='%s';" % job.launcher + self._iDb.execute(qry) + res = self._iDb.fetchall() + if len(res) > 1: + sys.stderr.write("ERROR while getting job status: non-unique jobs\n") + sys.stderr.flush() + sys.exit(1) + if res == None or len(res) == 0: + return "unknown" + return res[0][0] + + + ## Change a job status + # + # @param job: a Job instance with the job informations + # @param status: the new status (waiting,finished,error) + # + def changeJobStatus(self, job, status): + sqlCmd = "UPDATE %s" % self._table + sqlCmd += " SET status='%s'" % status + sqlCmd += ", node='%s'" % job.node + sqlCmd += " WHERE groupid='%s'" % job.groupid + sqlCmd += " AND jobname='%s'" % job.jobname + sqlCmd += " AND launcher='%s';" % job.launcher + self._iDb.execute(sqlCmd) + + + ## Get the number of jobs belonging to the desired groupid with the desired status. + # + # @param groupid string a group identifier to record related job series + # @param status string job status (waiting, running, finished, error) + # @return int + # + def getCountStatus(self, groupid, status): + qry = "SELECT count(jobname) FROM %s" % self._table + qry += " WHERE groupid='%s'" % groupid + qry += " AND status='%s';" % status + self._iDb.execute(qry) + res = self._iDb.fetchall() + return int(res[0][0]) + + + ## Clean all job from a job group + # + # @param groupid: a group identifier to record related job series + # + def cleanJobGroup(self, groupid): + qry = "DELETE FROM %s WHERE groupid='%s';" % (self._table, groupid) + self._iDb.execute(qry) + + + ## Check if there is unfinished job from a job group. + # + # @param groupid string a group identifier to record related job series + # + def hasUnfinishedJob(self, groupid): + qry = "SELECT * FROM %s" % self._table + qry += " WHERE groupid='%s'" % groupid + qry += " and status!='finished';" + self._iDb.execute(qry) + res = self._iDb.fetchall() + if len(res) == 0: + return False + return True + + + ## Wait job finished status from a job group. + # Job are re-launched if error (max. 3 times) + # + # @param groupid string a group identifier to record related job series + # @param checkInterval integer time laps in seconds between two checks (default = 5) + # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) + # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) + # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) + # + def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): + dJob2Err = {} + + # retrieve the total number of jobs belonging to the desired groupid + qry = "SELECT count(jobname) FROM %s WHERE groupid='%s';" % (self._table, groupid) + self._iDb.execute(qry) + totalNbJobs = int(self._iDb.fetchall()[0][0]) + + nbTimeOuts = 0 + + while True: + time.sleep(checkInterval) + # retrieve the finished jobs and stop if all jobs are finished + nbFinishedJobs = self.getCountStatus(groupid, "finished") + if nbFinishedJobs == totalNbJobs: + break + + # retrieve the jobs in error and relaunch them if they are in error (max. 'maxRelaunch' times) + qry = "SELECT * FROM %s" % self._table + qry += " WHERE groupid='%s'" % groupid + qry += " AND status ='error';" + self._iDb.execute(qry) + lJobsInError = self._iDb.fetchall() + for job in lJobsInError: + jobName = job[1] + if not dJob2Err.has_key(jobName): + dJob2Err[jobName] = 1 + if dJob2Err[jobName] < maxRelaunch: + print "job '%s' in error, re-submitting (%i)" % (job[1], dJob2Err[job[1]]) + sys.stdout.flush() + lResources = job[5].replace("[", "").replace("]", "").replace("'", "").split(", ") + newJob = Job(jobname=jobName, groupid=job[2], launcherFile=job[3], queue=job[4], lResources=lResources) + self.submitJob(newJob) + dJob2Err[jobName] += 1 + else: + dJob2Err[jobName] += 1 + cmd = "job '%s' in permanent error (>%i)" % (jobName, maxRelaunch) + cmd += "\ngroupid = %s" % groupid + cmd += "\nnb of jobs = %i" % totalNbJobs + cmd += "\nnb of finished jobs = %i" % self.getCountStatus(groupid, "finished") + cmd += "\nnb of waiting jobs = %i" % self.getCountStatus(groupid, "waiting") + cmd += "\nnb of running jobs = %i" % self.getCountStatus(groupid, "running") + cmd += "\nnb of jobs in error = %i" % self.getCountStatus(groupid, "error") + sys.stdout.flush() + if exitIfTooManyErrors: + self.cleanJobGroup(groupid) + sys.exit(1) + else: + checkInterval = 60 + nbTimeOuts = self._checkIfJobsTableAndJobsManagerInfoAreConsistent(nbTimeOuts, timeOutPerJob, groupid) + + + ## Submit a job to a queue and record it in job table. + # + # @param job a job instance + # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) + # @param checkInterval integer time laps in seconds between two checks (default = 30) + # @param verbose integer (default = 0) + # + def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30): + if self.getJobStatus(job) in ["waiting", "running", "finished"]: + sys.stderr.write( "WARNING: job '%s' was already submitted\n" % job.jobname) + sys.stderr.flush() + self.cleanJobGroup(job.groupid) + sys.exit(1) + + while self.getCountStatus(job.groupid, "waiting") > maxNbWaitingJobs: + time.sleep(checkInterval) + + self.recordJob(job) + cmd = self._getQsubCommand(job) + returnStatus = os.system(cmd) + + if returnStatus == 0: + fileName = "jobid.stdout" + jobidFileHandler = open(fileName, "r") + jobid = self._getJobidFromJobManager(jobidFileHandler) + if verbose > 0: + print "job '%i %s' submitted" % (jobid, job.jobname) + sys.stdout.flush() + job.jobid = jobid + jobidFileHandler.close() + self.updateJobIdInDB(job, jobid) + os.remove(fileName) + return returnStatus + + + ## Get the list of nodes where jobs of one group were executed + # + # @param groupid string a group identifier of job series + # @return lNodes list of nodes names without redundancy + # + def getNodesListByGroupId(self, groupId): + qry = "SELECT DISTINCT node FROM %s" % self._table + qry += " WHERE groupid='%s'" % groupId + self._iDb.execute(qry) + res = self._iDb.fetchall() + lNodes = [] + for resTuple in res: + lNodes.append(resTuple[0]) + return lNodes + + def checkJobTable(self): + if not self._iDb.doesTableExist(self._table): + self._iDb.createTable(self._table, "jobs") + else: + lExpFields = sorted(["jobid", "jobname", "groupid", "launcher", "queue", "resources", "status", "time", "node"]) + lObsFields = sorted(self._iDb.getFieldList(self._table)) + if lExpFields != lObsFields: + self._iDb.createTable(self._table, "jobs", overwrite = True) + + def close(self): + self._iDb.close() + + def _getJobidAndNbJob(self, jobid) : + tab = jobid.split(".") + jobid = tab[0] + tab = tab[1].split(":") + nbJob = tab[0] + return jobid, nbJob + +class TableJobAdaptatorSGE(TableJobAdaptator): + + def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid): + # retrieve the date and time at which the oldest, still-running job was submitted + sql = "SELECT jobid,jobname,time FROM %s WHERE groupid='%s' AND status='running' ORDER BY time DESC LIMIT 1" % (self._table, groupid) + self._iDb.execute( sql ) + res = self._iDb.fetchall() + if len(res) > 0: + jobid = res[0][0] + jobname = res[0][1] + dateTimeOldestJob = res[0][2] + dateTimeCurrent = datetime.datetime.now() + # delta is time between (i) first job launched of the given groupid and still in running state and (ii) current time + delta = dateTimeCurrent - dateTimeOldestJob + # check if delta is in an interval: 0 <= delta < 1h | 1h <= delta < 2h | 2h <= delta < 3h (timeOutPerJob = 1h) + if delta.seconds >= nbTimeOuts * timeOutPerJob and delta.seconds < (nbTimeOuts+1) * timeOutPerJob: + return nbTimeOuts + # delta outside the interval: go to next interval (time out) + if delta.seconds >= (nbTimeOuts+1) * timeOutPerJob: + nbTimeOuts += 1 + # Job with 'running' status should be in qstat. Because status in DB is set at 'running' by the job launched. + if not self.isJobStillHandledBySge(jobid, jobname): + # But if not, let time for the status update (in DB), if the job finished between the query execution and now. + time.sleep( 5 ) + # If no update at 'finished', exit + #TODO: check status in DB + if not self.isJobStillHandledBySge(jobid, jobname): + msg = "ERROR: job '%s', supposedly still running, is not handled by SGE anymore" % ( jobid ) + msg += "\nit was launched the %s (> %.2f hours ago)" % ( dateTimeOldestJob, timeOutPerJob/3600.0 ) + msg += "\nthis problem can be due to:" + msg += "\n* memory shortage, in that case, decrease the size of your jobs;" + msg += "\n* timeout, in that case, decrease the size of your jobs;" + msg += "\n* node failure or database error, in that case, launch the program again or ask your system administrator." + sys.stderr.write("%s\n" % msg) + sys.stderr.flush() + self.cleanJobGroup(groupid) + sys.exit(1) + return nbTimeOuts + + ## Check if a job is still handled by SGE + # + # @param jobid string job identifier + # @param jobname string job name + # + def isJobStillHandledBySge(self, jobid, jobname): + isJobInQstat = False + qstatFile = "qstat_stdout" + cmd = "qstat > %s" % qstatFile + returnStatus = os.system(cmd) + if returnStatus != 0: + msg = "ERROR while launching 'qstat'" + sys.stderr.write( "%s\n" % msg ) + sys.exit(1) + qstatFileHandler = open(qstatFile, "r") + lLines = qstatFileHandler.readlines() + for line in lLines: + tokens = line.split() + if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: + isJobInQstat = True + break + qstatFileHandler.close() + os.remove(qstatFile) + return isJobInQstat + + def _getQsubCommand(self, job): + cmd = "echo '%s' | " % job.launcher + prg = "qsub" + cmd += prg + cmd += " -V" + cmd += " -N %s" % job.jobname + if job.queue != "": + cmd += " -q %s" % job.queue + cmd += " -cwd" + if job.lResources != []: + cmd += " -l \"" + cmd += " ".join(job.lResources) + cmd += "\"" + if job.parallelEnvironment != "": + cmd += " -pe " + job.parallelEnvironment + cmd += " > jobid.stdout" + return cmd + + def _getJobidFromJobManager(self, jobidFileHandler): + return int(jobidFileHandler.readline().split(" ")[2]) + + +class TableJobAdaptatorTorque(TableJobAdaptator): + + def _checkIfJobsTableAndJobsManagerInfoAreConsistent(self, nbTimeOuts, timeOutPerJob, groupid): + return nbTimeOuts + + def _getQsubCommand(self, job): + cmd = "echo '%s' | " % job.launcher + prg = "qsub" + cmd += prg + cmd += " -V" + cmd += " -d %s" % os.getcwd() + cmd += " -N %s" % job.jobname + if job.queue != "": + cmd += " -q %s" % job.queue + if job.lResources != []: + cmd += " -l \"" + cmd += " ".join(job.lResources).replace("mem_free","mem") + cmd += "\"" + cmd += " > jobid.stdout" + return cmd + + def _getJobidFromJobManager(self, jobidFileHandler): + return int(jobidFileHandler.readline().split(".")[0])