Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/RepetJob.py @ 6:769e306b7933
Change the repository level.
| author | yufei-luo |
|---|---|
| date | Fri, 18 Jan 2013 04:54:14 -0500 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 5:ea3082881bf8 | 6:769e306b7933 |
|---|---|
| 1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
| 2 # http://www.inra.fr | |
| 3 # http://urgi.versailles.inra.fr | |
| 4 # | |
| 5 # This software is governed by the CeCILL license under French law and | |
| 6 # abiding by the rules of distribution of free software. You can use, | |
| 7 # modify and/ or redistribute the software under the terms of the CeCILL | |
| 8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
| 9 # "http://www.cecill.info". | |
| 10 # | |
| 11 # As a counterpart to the access to the source code and rights to copy, | |
| 12 # modify and redistribute granted by the license, users are provided only | |
| 13 # with a limited warranty and the software's author, the holder of the | |
| 14 # economic rights, and the successive licensors have only limited | |
| 15 # liability. | |
| 16 # | |
| 17 # In this respect, the user's attention is drawn to the risks associated | |
| 18 # with loading, using, modifying and/or developing or reproducing the | |
| 19 # software by the user in light of its specific status of free software, | |
| 20 # that may mean that it is complicated to manipulate, and that also | |
| 21 # therefore means that it is reserved for developers and experienced | |
| 22 # professionals having in-depth computer knowledge. Users are therefore | |
| 23 # encouraged to load and test the software's suitability as regards their | |
| 24 # requirements in conditions enabling the security of their systems and/or | |
| 25 # data to be ensured and, more generally, to use and operate it in the | |
| 26 # same conditions as regards security. | |
| 27 # | |
| 28 # The fact that you are presently reading this means that you have had | |
| 29 # knowledge of the CeCILL license and that you accept its terms. | |
| 30 | |
| 31 | |
| 32 import os | |
| 33 import time | |
| 34 import sys | |
| 35 from commons.core.sql.DbMySql import DbMySql | |
| 36 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
| 37 | |
| 38 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... | |
| 39 ## Methods for Job persistence | |
| 40 # | |
| 41 class RepetJob( DbMySql ): | |
| 42 | |
| 43 | |
| 44 ## Record a job | |
| 45 # | |
| 46 # @param job Job instance with the job informations | |
| 47 # | |
| 48 def recordJob( self, job ): | |
| 49 self.removeJob( job ) | |
| 50 sqlCmd = "INSERT INTO %s" % ( job.tablename ) | |
| 51 sqlCmd += " VALUES (" | |
| 52 sqlCmd += " \"%s\"," % ( job.jobid ) | |
| 53 sqlCmd += " \"%s\"," % ( job.jobname ) | |
| 54 sqlCmd += " \"%s\"," % ( job.groupid ) | |
| 55 sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") ) | |
| 56 sqlCmd += " \"%s\"," % ( job.launcher ) | |
| 57 sqlCmd += " \"%s\"," % ( job.queue ) | |
| 58 sqlCmd += " \"waiting\"," | |
| 59 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) | |
| 60 sqlCmd += " \"?\" );" | |
| 61 self.execute( sqlCmd ) | |
| 62 | |
| 63 | |
| 64 ## Remove a job from the job table | |
| 65 # | |
| 66 # @param job: job instance to remove | |
| 67 # | |
| 68 def removeJob( self, job ): | |
| 69 qry = "DELETE FROM %s" % ( job.tablename ) | |
| 70 qry += " WHERE groupid='%s'" % ( job.groupid ) | |
| 71 qry += " AND jobname='%s'" % ( job.jobname ) | |
| 72 qry += " AND queue='%s';" % ( job.queue ) | |
| 73 self.execute( qry ) | |
| 74 | |
| 75 | |
| 76 ## Set the jobid of a job with the id of SGE | |
| 77 # | |
| 78 # @param job job instance | |
| 79 # @param jobid integer | |
| 80 # | |
| 81 def setJobIdFromSge( self, job, jobid ): | |
| 82 qry = "UPDATE %s" % ( job.tablename ) | |
| 83 qry += " SET jobid='%i'" % ( int(jobid) ) | |
| 84 qry += " WHERE jobname='%s'" % ( job.jobname ) | |
| 85 qry += " AND groupid='%s'" % ( job.groupid ) | |
| 86 qry += " AND queue='%s';" % ( job.queue ) | |
| 87 self.execute( qry ) | |
| 88 | |
| 89 | |
| 90 ## Get a job status | |
| 91 # | |
| 92 # @param job: a Job instance with the job informations | |
| 93 # | |
| 94 def getJobStatus( self, job ): | |
| 95 if job.jobid != 0 and job.jobname == "": | |
| 96 job.jobname = job.jobid | |
| 97 job.jobid = 0 | |
| 98 qry = "SELECT status FROM %s" % ( job.tablename ) | |
| 99 qry += " WHERE groupid='%s'" % ( job.groupid ) | |
| 100 qry += " AND jobname='%s'" % ( job.jobname ) | |
| 101 qry += " AND queue='%s';" % ( job.queue ) | |
| 102 self.execute( qry ) | |
| 103 res = self.fetchall() | |
| 104 if len(res) > 1: | |
| 105 msg = "ERROR while getting job status: non-unique jobs" | |
| 106 sys.stderr.write( "%s\n" % msg ) | |
| 107 sys.stderr.flush() | |
| 108 sys.exit(1) | |
| 109 if res == None or len(res) == 0: | |
| 110 return "unknown" | |
| 111 return res[0][0] | |
| 112 | |
| 113 | |
| 114 ## Change a job status | |
| 115 # | |
| 116 # @param job: a Job instance with the job informations | |
| 117 # @param status: the new status (waiting,finished,error) | |
| 118 # @param method: db or file | |
| 119 # | |
| 120 def changeJobStatus( self, job, status, method=""): | |
| 121 sqlCmd = "UPDATE %s" % ( job.tablename ) | |
| 122 sqlCmd += " SET status='%s'" % ( status ) | |
| 123 sqlCmd += ",node='%s'" % ( job.node ) | |
| 124 sqlCmd += " WHERE groupid='%s'" % ( job.groupid ) | |
| 125 sqlCmd += " AND jobname='%s'" % ( job.jobname ) | |
| 126 sqlCmd += " AND queue='%s';" % ( job.queue ) | |
| 127 self.execute( sqlCmd ) | |
| 128 | |
| 129 | |
| 130 ## Get the number of jobs belonging to the desired groupid with the desired status. | |
| 131 # | |
| 132 # @param tablename string table name to record the jobs | |
| 133 # @param groupid string a group identifier to record related job series | |
| 134 # @param status string job status (waiting, running, finished, error) | |
| 135 # @return int | |
| 136 # | |
| 137 def getCountStatus( self, tablename, groupid, status ): | |
| 138 qry = "SELECT count(jobname) FROM %s" % ( tablename ) | |
| 139 qry += " WHERE groupid='%s'" % ( groupid ) | |
| 140 qry += " AND status='%s';" % ( status ) | |
| 141 self.execute( qry ) | |
| 142 res = self.fetchall() | |
| 143 return int( res[0][0] ) | |
| 144 | |
| 145 | |
| 146 ## Clean all job from a job group | |
| 147 # | |
| 148 # @param tablename table name to record the jobs | |
| 149 # @param groupid: a group identifier to record related job series | |
| 150 # | |
| 151 def cleanJobGroup( self, tablename, groupid ): | |
| 152 if self.doesTableExist( tablename ): | |
| 153 qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid ) | |
| 154 self.execute( qry ) | |
| 155 | |
| 156 | |
| 157 ## Check if there is unfinished job from a job group. | |
| 158 # | |
| 159 # @param tablename string table name to record the jobs | |
| 160 # @param groupid string a group identifier to record related job series | |
| 161 # | |
| 162 def hasUnfinishedJob( self, tablename, groupid ): | |
| 163 if not self.doesTableExist( tablename ): | |
| 164 return False | |
| 165 qry = "SELECT * FROM %s" % ( tablename ) | |
| 166 qry += " WHERE groupid='%s'" % ( groupid ) | |
| 167 qry += " and status!='finished';" | |
| 168 self.execute( qry ) | |
| 169 res = self.fetchall() | |
| 170 if len(res) == 0: | |
| 171 return False | |
| 172 return True | |
| 173 | |
| 174 | |
| 175 ## Check if a job is still handled by SGE | |
| 176 # | |
| 177 # @param jobid string job identifier | |
| 178 # @param jobname string job name | |
| 179 # | |
| 180 def isJobStillHandledBySge( self, jobid, jobname ): | |
| 181 isJobInQstat = False | |
| 182 qstatFile = "qstat_stdout" | |
| 183 cmd = "qstat > %s" % ( qstatFile ) | |
| 184 returnStatus = os.system( cmd ) | |
| 185 if returnStatus != 0: | |
| 186 msg = "ERROR while launching 'qstat'" | |
| 187 sys.stderr.write( "%s\n" % msg ) | |
| 188 sys.exit(1) | |
| 189 qstatFileHandler = open( qstatFile, "r" ) | |
| 190 lLines = qstatFileHandler.readlines() | |
| 191 for line in lLines: | |
| 192 tokens = line.split() | |
| 193 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: | |
| 194 isJobInQstat = True | |
| 195 break | |
| 196 qstatFileHandler.close() | |
| 197 os.remove( qstatFile ) | |
| 198 return isJobInQstat | |
| 199 | |
| 200 | |
| 201 ## Wait job finished status from a job group. | |
| 202 # Job are re-launched if error (max. 3 times) | |
| 203 # | |
| 204 # @param tableName string table name to record the jobs | |
| 205 # @param groupid string a group identifier to record related job series | |
| 206 # @param checkInterval integer time laps in seconds between two checks (default = 5) | |
| 207 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) | |
| 208 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) | |
| 209 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) | |
| 210 # | |
| 211 def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): | |
| 212 iTJA = TableJobAdaptatorFactory.createInstance(self, tableName) | |
| 213 iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob) | |
| 214 | |
| 215 ## Submit a job to a queue and record it in job table. | |
| 216 # | |
| 217 # @param job a job instance | |
| 218 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) | |
| 219 # @param checkInterval integer time laps in seconds between two checks (default = 30) | |
| 220 # @param verbose integer (default = 0) | |
| 221 # | |
| 222 def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ): | |
| 223 iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename) | |
| 224 return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval) | |
| 225 | |
| 226 | |
| 227 ## Get the list of nodes where jobs of one group were executed | |
| 228 # | |
| 229 # @param tablename string table name where jobs are recored | |
| 230 # @param groupid string a group identifier of job series | |
| 231 # @return lNodes list of nodes names | |
| 232 # | |
| 233 def getNodesListByGroupId( self, tableName, groupId ): | |
| 234 qry = "SELECT node FROM %s" % tableName | |
| 235 qry += " WHERE groupid='%s'" % groupId | |
| 236 self.execute( qry ) | |
| 237 res = self.fetchall() | |
| 238 lNodes = [] | |
| 239 for resTuple in res: | |
| 240 lNodes.append(resTuple[0]) | |
| 241 return lNodes | |
| 242 | |
| 243 def getDbName(self): | |
| 244 return "DbMySql" | |
| 245 | |
| 246 def _getJobidAndNbJob(self, jobid) : | |
| 247 tab = [] | |
| 248 tab = jobid.split(".") | |
| 249 jobid = tab[0] | |
| 250 tab = tab[1].split(":") | |
| 251 nbJob = tab[0] | |
| 252 return jobid, nbJob |
