comparison commons/core/sql/RepetJob.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import os
33 import time
34 import sys
35 from commons.core.sql.DbMySql import DbMySql
36 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
37
38 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
39 ## Methods for Job persistence
40 #
41 class RepetJob( DbMySql ):
42
43
44 ## Record a job
45 #
46 # @param job Job instance with the job informations
47 #
48 def recordJob( self, job ):
49 self.removeJob( job )
50 sqlCmd = "INSERT INTO %s" % ( job.tablename )
51 sqlCmd += " VALUES ("
52 sqlCmd += " \"%s\"," % ( job.jobid )
53 sqlCmd += " \"%s\"," % ( job.jobname )
54 sqlCmd += " \"%s\"," % ( job.groupid )
55 sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") )
56 sqlCmd += " \"%s\"," % ( job.launcher )
57 sqlCmd += " \"%s\"," % ( job.queue )
58 sqlCmd += " \"waiting\","
59 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
60 sqlCmd += " \"?\" );"
61 self.execute( sqlCmd )
62
63
64 ## Remove a job from the job table
65 #
66 # @param job: job instance to remove
67 #
68 def removeJob( self, job ):
69 qry = "DELETE FROM %s" % ( job.tablename )
70 qry += " WHERE groupid='%s'" % ( job.groupid )
71 qry += " AND jobname='%s'" % ( job.jobname )
72 qry += " AND queue='%s';" % ( job.queue )
73 self.execute( qry )
74
75
76 ## Set the jobid of a job with the id of SGE
77 #
78 # @param job job instance
79 # @param jobid integer
80 #
81 def setJobIdFromSge( self, job, jobid ):
82 qry = "UPDATE %s" % ( job.tablename )
83 qry += " SET jobid='%i'" % ( int(jobid) )
84 qry += " WHERE jobname='%s'" % ( job.jobname )
85 qry += " AND groupid='%s'" % ( job.groupid )
86 qry += " AND queue='%s';" % ( job.queue )
87 self.execute( qry )
88
89
90 ## Get a job status
91 #
92 # @param job: a Job instance with the job informations
93 #
94 def getJobStatus( self, job ):
95 if job.jobid != 0 and job.jobname == "":
96 job.jobname = job.jobid
97 job.jobid = 0
98 qry = "SELECT status FROM %s" % ( job.tablename )
99 qry += " WHERE groupid='%s'" % ( job.groupid )
100 qry += " AND jobname='%s'" % ( job.jobname )
101 qry += " AND queue='%s';" % ( job.queue )
102 self.execute( qry )
103 res = self.fetchall()
104 if len(res) > 1:
105 msg = "ERROR while getting job status: non-unique jobs"
106 sys.stderr.write( "%s\n" % msg )
107 sys.stderr.flush()
108 sys.exit(1)
109 if res == None or len(res) == 0:
110 return "unknown"
111 return res[0][0]
112
113
114 ## Change a job status
115 #
116 # @param job: a Job instance with the job informations
117 # @param status: the new status (waiting,finished,error)
118 # @param method: db or file
119 #
120 def changeJobStatus( self, job, status, method=""):
121 sqlCmd = "UPDATE %s" % ( job.tablename )
122 sqlCmd += " SET status='%s'" % ( status )
123 sqlCmd += ",node='%s'" % ( job.node )
124 sqlCmd += " WHERE groupid='%s'" % ( job.groupid )
125 sqlCmd += " AND jobname='%s'" % ( job.jobname )
126 sqlCmd += " AND queue='%s';" % ( job.queue )
127 self.execute( sqlCmd )
128
129
130 ## Get the number of jobs belonging to the desired groupid with the desired status.
131 #
132 # @param tablename string table name to record the jobs
133 # @param groupid string a group identifier to record related job series
134 # @param status string job status (waiting, running, finished, error)
135 # @return int
136 #
137 def getCountStatus( self, tablename, groupid, status ):
138 qry = "SELECT count(jobname) FROM %s" % ( tablename )
139 qry += " WHERE groupid='%s'" % ( groupid )
140 qry += " AND status='%s';" % ( status )
141 self.execute( qry )
142 res = self.fetchall()
143 return int( res[0][0] )
144
145
146 ## Clean all job from a job group
147 #
148 # @param tablename table name to record the jobs
149 # @param groupid: a group identifier to record related job series
150 #
151 def cleanJobGroup( self, tablename, groupid ):
152 if self.doesTableExist( tablename ):
153 qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid )
154 self.execute( qry )
155
156
157 ## Check if there is unfinished job from a job group.
158 #
159 # @param tablename string table name to record the jobs
160 # @param groupid string a group identifier to record related job series
161 #
162 def hasUnfinishedJob( self, tablename, groupid ):
163 if not self.doesTableExist( tablename ):
164 return False
165 qry = "SELECT * FROM %s" % ( tablename )
166 qry += " WHERE groupid='%s'" % ( groupid )
167 qry += " and status!='finished';"
168 self.execute( qry )
169 res = self.fetchall()
170 if len(res) == 0:
171 return False
172 return True
173
174
175 ## Check if a job is still handled by SGE
176 #
177 # @param jobid string job identifier
178 # @param jobname string job name
179 #
180 def isJobStillHandledBySge( self, jobid, jobname ):
181 isJobInQstat = False
182 qstatFile = "qstat_stdout"
183 cmd = "qstat > %s" % ( qstatFile )
184 returnStatus = os.system( cmd )
185 if returnStatus != 0:
186 msg = "ERROR while launching 'qstat'"
187 sys.stderr.write( "%s\n" % msg )
188 sys.exit(1)
189 qstatFileHandler = open( qstatFile, "r" )
190 lLines = qstatFileHandler.readlines()
191 for line in lLines:
192 tokens = line.split()
193 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
194 isJobInQstat = True
195 break
196 qstatFileHandler.close()
197 os.remove( qstatFile )
198 return isJobInQstat
199
200
201 ## Wait job finished status from a job group.
202 # Job are re-launched if error (max. 3 times)
203 #
204 # @param tableName string table name to record the jobs
205 # @param groupid string a group identifier to record related job series
206 # @param checkInterval integer time laps in seconds between two checks (default = 5)
207 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
208 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
209 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
210 #
211 def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
212 iTJA = TableJobAdaptatorFactory.createInstance(self, tableName)
213 iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob)
214
215 ## Submit a job to a queue and record it in job table.
216 #
217 # @param job a job instance
218 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
219 # @param checkInterval integer time laps in seconds between two checks (default = 30)
220 # @param verbose integer (default = 0)
221 #
222 def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ):
223 iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename)
224 return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval)
225
226
227 ## Get the list of nodes where jobs of one group were executed
228 #
229 # @param tablename string table name where jobs are recored
230 # @param groupid string a group identifier of job series
231 # @return lNodes list of nodes names
232 #
233 def getNodesListByGroupId( self, tableName, groupId ):
234 qry = "SELECT node FROM %s" % tableName
235 qry += " WHERE groupid='%s'" % groupId
236 self.execute( qry )
237 res = self.fetchall()
238 lNodes = []
239 for resTuple in res:
240 lNodes.append(resTuple[0])
241 return lNodes
242
243 def getDbName(self):
244 return "DbMySql"
245
246 def _getJobidAndNbJob(self, jobid) :
247 tab = []
248 tab = jobid.split(".")
249 jobid = tab[0]
250 tab = tab[1].split(":")
251 nbJob = tab[0]
252 return jobid, nbJob