Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/RepetJob.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/RepetJob.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,252 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import os +import time +import sys +from commons.core.sql.DbMySql import DbMySql +from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory + +#TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... +## Methods for Job persistence +# +class RepetJob( DbMySql ): + + + ## Record a job + # + # @param job Job instance with the job informations + # + def recordJob( self, job ): + self.removeJob( job ) + sqlCmd = "INSERT INTO %s" % ( job.tablename ) + sqlCmd += " VALUES (" + sqlCmd += " \"%s\"," % ( job.jobid ) + sqlCmd += " \"%s\"," % ( job.jobname ) + sqlCmd += " \"%s\"," % ( job.groupid ) + sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") ) + sqlCmd += " \"%s\"," % ( job.launcher ) + sqlCmd += " \"%s\"," % ( job.queue ) + sqlCmd += " \"waiting\"," + sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) + sqlCmd += " \"?\" );" + self.execute( sqlCmd ) + + + ## Remove a job from the job table + # + # @param job: job instance to remove + # + def removeJob( self, job ): + qry = "DELETE FROM %s" % ( job.tablename ) + qry += " WHERE groupid='%s'" % ( job.groupid ) + qry += " AND jobname='%s'" % ( job.jobname ) + qry += " AND queue='%s';" % ( job.queue ) + self.execute( qry ) + + + ## Set the jobid of a job with the id of SGE + # + # @param job job instance + # @param jobid integer + # + def setJobIdFromSge( self, job, jobid ): + qry = "UPDATE %s" % ( job.tablename ) + qry += " SET jobid='%i'" % ( int(jobid) ) + qry += " WHERE jobname='%s'" % ( job.jobname ) + qry += " AND groupid='%s'" % ( job.groupid ) + qry += " AND queue='%s';" % ( job.queue ) + self.execute( qry ) + + + ## Get a job status + # + # @param job: a Job instance with the job informations + # + def getJobStatus( self, job ): + if job.jobid != 0 and job.jobname == "": + job.jobname = job.jobid + job.jobid = 0 + qry = "SELECT status FROM %s" % ( job.tablename ) + qry += " WHERE groupid='%s'" % ( job.groupid ) + qry += " AND jobname='%s'" % ( job.jobname ) + qry += " AND queue='%s';" % ( job.queue ) + self.execute( qry ) + res = self.fetchall() + if len(res) > 1: + msg = "ERROR while getting job status: non-unique jobs" + sys.stderr.write( "%s\n" % msg ) + sys.stderr.flush() + sys.exit(1) + if res == None or len(res) == 0: + return "unknown" + return res[0][0] + + + ## Change a job status + # + # @param job: a Job instance with the job informations + # @param status: the new status (waiting,finished,error) + # @param method: db or file + # + def changeJobStatus( self, job, status, method=""): + sqlCmd = "UPDATE %s" % ( job.tablename ) + sqlCmd += " SET status='%s'" % ( status ) + sqlCmd += ",node='%s'" % ( job.node ) + sqlCmd += " WHERE groupid='%s'" % ( job.groupid ) + sqlCmd += " AND jobname='%s'" % ( job.jobname ) + sqlCmd += " AND queue='%s';" % ( job.queue ) + self.execute( sqlCmd ) + + + ## Get the number of jobs belonging to the desired groupid with the desired status. + # + # @param tablename string table name to record the jobs + # @param groupid string a group identifier to record related job series + # @param status string job status (waiting, running, finished, error) + # @return int + # + def getCountStatus( self, tablename, groupid, status ): + qry = "SELECT count(jobname) FROM %s" % ( tablename ) + qry += " WHERE groupid='%s'" % ( groupid ) + qry += " AND status='%s';" % ( status ) + self.execute( qry ) + res = self.fetchall() + return int( res[0][0] ) + + + ## Clean all job from a job group + # + # @param tablename table name to record the jobs + # @param groupid: a group identifier to record related job series + # + def cleanJobGroup( self, tablename, groupid ): + if self.doesTableExist( tablename ): + qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid ) + self.execute( qry ) + + + ## Check if there is unfinished job from a job group. + # + # @param tablename string table name to record the jobs + # @param groupid string a group identifier to record related job series + # + def hasUnfinishedJob( self, tablename, groupid ): + if not self.doesTableExist( tablename ): + return False + qry = "SELECT * FROM %s" % ( tablename ) + qry += " WHERE groupid='%s'" % ( groupid ) + qry += " and status!='finished';" + self.execute( qry ) + res = self.fetchall() + if len(res) == 0: + return False + return True + + + ## Check if a job is still handled by SGE + # + # @param jobid string job identifier + # @param jobname string job name + # + def isJobStillHandledBySge( self, jobid, jobname ): + isJobInQstat = False + qstatFile = "qstat_stdout" + cmd = "qstat > %s" % ( qstatFile ) + returnStatus = os.system( cmd ) + if returnStatus != 0: + msg = "ERROR while launching 'qstat'" + sys.stderr.write( "%s\n" % msg ) + sys.exit(1) + qstatFileHandler = open( qstatFile, "r" ) + lLines = qstatFileHandler.readlines() + for line in lLines: + tokens = line.split() + if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: + isJobInQstat = True + break + qstatFileHandler.close() + os.remove( qstatFile ) + return isJobInQstat + + + ## Wait job finished status from a job group. + # Job are re-launched if error (max. 3 times) + # + # @param tableName string table name to record the jobs + # @param groupid string a group identifier to record related job series + # @param checkInterval integer time laps in seconds between two checks (default = 5) + # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) + # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) + # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) + # + def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): + iTJA = TableJobAdaptatorFactory.createInstance(self, tableName) + iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob) + + ## Submit a job to a queue and record it in job table. + # + # @param job a job instance + # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) + # @param checkInterval integer time laps in seconds between two checks (default = 30) + # @param verbose integer (default = 0) + # + def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ): + iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename) + return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval) + + + ## Get the list of nodes where jobs of one group were executed + # + # @param tablename string table name where jobs are recored + # @param groupid string a group identifier of job series + # @return lNodes list of nodes names + # + def getNodesListByGroupId( self, tableName, groupId ): + qry = "SELECT node FROM %s" % tableName + qry += " WHERE groupid='%s'" % groupId + self.execute( qry ) + res = self.fetchall() + lNodes = [] + for resTuple in res: + lNodes.append(resTuple[0]) + return lNodes + + def getDbName(self): + return "DbMySql" + + def _getJobidAndNbJob(self, jobid) : + tab = [] + tab = jobid.split(".") + jobid = tab[0] + tab = tab[1].split(":") + nbJob = tab[0] + return jobid, nbJob