Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/RepetJob.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line source
# Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import os import time import sys from commons.core.sql.DbMySql import DbMySql from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... ## Methods for Job persistence # class RepetJob( DbMySql ): ## Record a job # # @param job Job instance with the job informations # def recordJob( self, job ): self.removeJob( job ) sqlCmd = "INSERT INTO %s" % ( job.tablename ) sqlCmd += " VALUES (" sqlCmd += " \"%s\"," % ( job.jobid ) sqlCmd += " \"%s\"," % ( job.jobname ) sqlCmd += " \"%s\"," % ( job.groupid ) sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") ) sqlCmd += " \"%s\"," % ( job.launcher ) sqlCmd += " \"%s\"," % ( job.queue ) sqlCmd += " \"waiting\"," sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) sqlCmd += " \"?\" );" self.execute( sqlCmd ) ## Remove a job from the job table # # @param job: job instance to remove # def removeJob( self, job ): qry = "DELETE FROM %s" % ( job.tablename ) qry += " WHERE groupid='%s'" % ( job.groupid ) qry += " AND jobname='%s'" % ( job.jobname ) qry += " AND queue='%s';" % ( job.queue ) self.execute( qry ) ## Set the jobid of a job with the id of SGE # # @param job job instance # @param jobid integer # def setJobIdFromSge( self, job, jobid ): qry = "UPDATE %s" % ( job.tablename ) qry += " SET jobid='%i'" % ( int(jobid) ) qry += " WHERE jobname='%s'" % ( job.jobname ) qry += " AND groupid='%s'" % ( job.groupid ) qry += " AND queue='%s';" % ( job.queue ) self.execute( qry ) ## Get a job status # # @param job: a Job instance with the job informations # def getJobStatus( self, job ): if job.jobid != 0 and job.jobname == "": job.jobname = job.jobid job.jobid = 0 qry = "SELECT status FROM %s" % ( job.tablename ) qry += " WHERE groupid='%s'" % ( job.groupid ) qry += " AND jobname='%s'" % ( job.jobname ) qry += " AND queue='%s';" % ( job.queue ) self.execute( qry ) res = self.fetchall() if len(res) > 1: msg = "ERROR while getting job status: non-unique jobs" sys.stderr.write( "%s\n" % msg ) sys.stderr.flush() sys.exit(1) if res == None or len(res) == 0: return "unknown" return res[0][0] ## Change a job status # # @param job: a Job instance with the job informations # @param status: the new status (waiting,finished,error) # @param method: db or file # def changeJobStatus( self, job, status, method=""): sqlCmd = "UPDATE %s" % ( job.tablename ) sqlCmd += " SET status='%s'" % ( status ) sqlCmd += ",node='%s'" % ( job.node ) sqlCmd += " WHERE groupid='%s'" % ( job.groupid ) sqlCmd += " AND jobname='%s'" % ( job.jobname ) sqlCmd += " AND queue='%s';" % ( job.queue ) self.execute( sqlCmd ) ## Get the number of jobs belonging to the desired groupid with the desired status. # # @param tablename string table name to record the jobs # @param groupid string a group identifier to record related job series # @param status string job status (waiting, running, finished, error) # @return int # def getCountStatus( self, tablename, groupid, status ): qry = "SELECT count(jobname) FROM %s" % ( tablename ) qry += " WHERE groupid='%s'" % ( groupid ) qry += " AND status='%s';" % ( status ) self.execute( qry ) res = self.fetchall() return int( res[0][0] ) ## Clean all job from a job group # # @param tablename table name to record the jobs # @param groupid: a group identifier to record related job series # def cleanJobGroup( self, tablename, groupid ): if self.doesTableExist( tablename ): qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid ) self.execute( qry ) ## Check if there is unfinished job from a job group. # # @param tablename string table name to record the jobs # @param groupid string a group identifier to record related job series # def hasUnfinishedJob( self, tablename, groupid ): if not self.doesTableExist( tablename ): return False qry = "SELECT * FROM %s" % ( tablename ) qry += " WHERE groupid='%s'" % ( groupid ) qry += " and status!='finished';" self.execute( qry ) res = self.fetchall() if len(res) == 0: return False return True ## Check if a job is still handled by SGE # # @param jobid string job identifier # @param jobname string job name # def isJobStillHandledBySge( self, jobid, jobname ): isJobInQstat = False qstatFile = "qstat_stdout" cmd = "qstat > %s" % ( qstatFile ) returnStatus = os.system( cmd ) if returnStatus != 0: msg = "ERROR while launching 'qstat'" sys.stderr.write( "%s\n" % msg ) sys.exit(1) qstatFileHandler = open( qstatFile, "r" ) lLines = qstatFileHandler.readlines() for line in lLines: tokens = line.split() if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: isJobInQstat = True break qstatFileHandler.close() os.remove( qstatFile ) return isJobInQstat ## Wait job finished status from a job group. # Job are re-launched if error (max. 3 times) # # @param tableName string table name to record the jobs # @param groupid string a group identifier to record related job series # @param checkInterval integer time laps in seconds between two checks (default = 5) # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) # def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): iTJA = TableJobAdaptatorFactory.createInstance(self, tableName) iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob) ## Submit a job to a queue and record it in job table. # # @param job a job instance # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) # @param checkInterval integer time laps in seconds between two checks (default = 30) # @param verbose integer (default = 0) # def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ): iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename) return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval) ## Get the list of nodes where jobs of one group were executed # # @param tablename string table name where jobs are recored # @param groupid string a group identifier of job series # @return lNodes list of nodes names # def getNodesListByGroupId( self, tableName, groupId ): qry = "SELECT node FROM %s" % tableName qry += " WHERE groupid='%s'" % groupId self.execute( qry ) res = self.fetchall() lNodes = [] for resTuple in res: lNodes.append(resTuple[0]) return lNodes def getDbName(self): return "DbMySql" def _getJobidAndNbJob(self, jobid) : tab = [] tab = jobid.split(".") jobid = tab[0] tab = tab[1].split(":") nbJob = tab[0] return jobid, nbJob