6
|
1 # Copyright INRA (Institut National de la Recherche Agronomique)
|
|
2 # http://www.inra.fr
|
|
3 # http://urgi.versailles.inra.fr
|
|
4 #
|
|
5 # This software is governed by the CeCILL license under French law and
|
|
6 # abiding by the rules of distribution of free software. You can use,
|
|
7 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
8 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
9 # "http://www.cecill.info".
|
|
10 #
|
|
11 # As a counterpart to the access to the source code and rights to copy,
|
|
12 # modify and redistribute granted by the license, users are provided only
|
|
13 # with a limited warranty and the software's author, the holder of the
|
|
14 # economic rights, and the successive licensors have only limited
|
|
15 # liability.
|
|
16 #
|
|
17 # In this respect, the user's attention is drawn to the risks associated
|
|
18 # with loading, using, modifying and/or developing or reproducing the
|
|
19 # software by the user in light of its specific status of free software,
|
|
20 # that may mean that it is complicated to manipulate, and that also
|
|
21 # therefore means that it is reserved for developers and experienced
|
|
22 # professionals having in-depth computer knowledge. Users are therefore
|
|
23 # encouraged to load and test the software's suitability as regards their
|
|
24 # requirements in conditions enabling the security of their systems and/or
|
|
25 # data to be ensured and, more generally, to use and operate it in the
|
|
26 # same conditions as regards security.
|
|
27 #
|
|
28 # The fact that you are presently reading this means that you have had
|
|
29 # knowledge of the CeCILL license and that you accept its terms.
|
|
30
|
|
31
|
|
32 import os
|
|
33 import time
|
|
34 import sys
|
|
35 from commons.core.sql.DbMySql import DbMySql
|
|
36 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
37
|
|
38 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()...
|
|
39 ## Methods for Job persistence
|
|
40 #
|
|
41 class RepetJob( DbMySql ):
|
|
42
|
|
43
|
|
44 ## Record a job
|
|
45 #
|
|
46 # @param job Job instance with the job informations
|
|
47 #
|
|
48 def recordJob( self, job ):
|
|
49 self.removeJob( job )
|
|
50 sqlCmd = "INSERT INTO %s" % ( job.tablename )
|
|
51 sqlCmd += " VALUES ("
|
|
52 sqlCmd += " \"%s\"," % ( job.jobid )
|
|
53 sqlCmd += " \"%s\"," % ( job.jobname )
|
|
54 sqlCmd += " \"%s\"," % ( job.groupid )
|
|
55 sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") )
|
|
56 sqlCmd += " \"%s\"," % ( job.launcher )
|
|
57 sqlCmd += " \"%s\"," % ( job.queue )
|
|
58 sqlCmd += " \"waiting\","
|
|
59 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) )
|
|
60 sqlCmd += " \"?\" );"
|
|
61 self.execute( sqlCmd )
|
|
62
|
|
63
|
|
64 ## Remove a job from the job table
|
|
65 #
|
|
66 # @param job: job instance to remove
|
|
67 #
|
|
68 def removeJob( self, job ):
|
|
69 qry = "DELETE FROM %s" % ( job.tablename )
|
|
70 qry += " WHERE groupid='%s'" % ( job.groupid )
|
|
71 qry += " AND jobname='%s'" % ( job.jobname )
|
|
72 qry += " AND queue='%s';" % ( job.queue )
|
|
73 self.execute( qry )
|
|
74
|
|
75
|
|
76 ## Set the jobid of a job with the id of SGE
|
|
77 #
|
|
78 # @param job job instance
|
|
79 # @param jobid integer
|
|
80 #
|
|
81 def setJobIdFromSge( self, job, jobid ):
|
|
82 qry = "UPDATE %s" % ( job.tablename )
|
|
83 qry += " SET jobid='%i'" % ( int(jobid) )
|
|
84 qry += " WHERE jobname='%s'" % ( job.jobname )
|
|
85 qry += " AND groupid='%s'" % ( job.groupid )
|
|
86 qry += " AND queue='%s';" % ( job.queue )
|
|
87 self.execute( qry )
|
|
88
|
|
89
|
|
90 ## Get a job status
|
|
91 #
|
|
92 # @param job: a Job instance with the job informations
|
|
93 #
|
|
94 def getJobStatus( self, job ):
|
|
95 if job.jobid != 0 and job.jobname == "":
|
|
96 job.jobname = job.jobid
|
|
97 job.jobid = 0
|
|
98 qry = "SELECT status FROM %s" % ( job.tablename )
|
|
99 qry += " WHERE groupid='%s'" % ( job.groupid )
|
|
100 qry += " AND jobname='%s'" % ( job.jobname )
|
|
101 qry += " AND queue='%s';" % ( job.queue )
|
|
102 self.execute( qry )
|
|
103 res = self.fetchall()
|
|
104 if len(res) > 1:
|
|
105 msg = "ERROR while getting job status: non-unique jobs"
|
|
106 sys.stderr.write( "%s\n" % msg )
|
|
107 sys.stderr.flush()
|
|
108 sys.exit(1)
|
|
109 if res == None or len(res) == 0:
|
|
110 return "unknown"
|
|
111 return res[0][0]
|
|
112
|
|
113
|
|
114 ## Change a job status
|
|
115 #
|
|
116 # @param job: a Job instance with the job informations
|
|
117 # @param status: the new status (waiting,finished,error)
|
|
118 # @param method: db or file
|
|
119 #
|
|
120 def changeJobStatus( self, job, status, method=""):
|
|
121 sqlCmd = "UPDATE %s" % ( job.tablename )
|
|
122 sqlCmd += " SET status='%s'" % ( status )
|
|
123 sqlCmd += ",node='%s'" % ( job.node )
|
|
124 sqlCmd += " WHERE groupid='%s'" % ( job.groupid )
|
|
125 sqlCmd += " AND jobname='%s'" % ( job.jobname )
|
|
126 sqlCmd += " AND queue='%s';" % ( job.queue )
|
|
127 self.execute( sqlCmd )
|
|
128
|
|
129
|
|
130 ## Get the number of jobs belonging to the desired groupid with the desired status.
|
|
131 #
|
|
132 # @param tablename string table name to record the jobs
|
|
133 # @param groupid string a group identifier to record related job series
|
|
134 # @param status string job status (waiting, running, finished, error)
|
|
135 # @return int
|
|
136 #
|
|
137 def getCountStatus( self, tablename, groupid, status ):
|
|
138 qry = "SELECT count(jobname) FROM %s" % ( tablename )
|
|
139 qry += " WHERE groupid='%s'" % ( groupid )
|
|
140 qry += " AND status='%s';" % ( status )
|
|
141 self.execute( qry )
|
|
142 res = self.fetchall()
|
|
143 return int( res[0][0] )
|
|
144
|
|
145
|
|
146 ## Clean all job from a job group
|
|
147 #
|
|
148 # @param tablename table name to record the jobs
|
|
149 # @param groupid: a group identifier to record related job series
|
|
150 #
|
|
151 def cleanJobGroup( self, tablename, groupid ):
|
|
152 if self.doesTableExist( tablename ):
|
|
153 qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid )
|
|
154 self.execute( qry )
|
|
155
|
|
156
|
|
157 ## Check if there is unfinished job from a job group.
|
|
158 #
|
|
159 # @param tablename string table name to record the jobs
|
|
160 # @param groupid string a group identifier to record related job series
|
|
161 #
|
|
162 def hasUnfinishedJob( self, tablename, groupid ):
|
|
163 if not self.doesTableExist( tablename ):
|
|
164 return False
|
|
165 qry = "SELECT * FROM %s" % ( tablename )
|
|
166 qry += " WHERE groupid='%s'" % ( groupid )
|
|
167 qry += " and status!='finished';"
|
|
168 self.execute( qry )
|
|
169 res = self.fetchall()
|
|
170 if len(res) == 0:
|
|
171 return False
|
|
172 return True
|
|
173
|
|
174
|
|
175 ## Check if a job is still handled by SGE
|
|
176 #
|
|
177 # @param jobid string job identifier
|
|
178 # @param jobname string job name
|
|
179 #
|
|
180 def isJobStillHandledBySge( self, jobid, jobname ):
|
|
181 isJobInQstat = False
|
|
182 qstatFile = "qstat_stdout"
|
|
183 cmd = "qstat > %s" % ( qstatFile )
|
|
184 returnStatus = os.system( cmd )
|
|
185 if returnStatus != 0:
|
|
186 msg = "ERROR while launching 'qstat'"
|
|
187 sys.stderr.write( "%s\n" % msg )
|
|
188 sys.exit(1)
|
|
189 qstatFileHandler = open( qstatFile, "r" )
|
|
190 lLines = qstatFileHandler.readlines()
|
|
191 for line in lLines:
|
|
192 tokens = line.split()
|
|
193 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
|
|
194 isJobInQstat = True
|
|
195 break
|
|
196 qstatFileHandler.close()
|
|
197 os.remove( qstatFile )
|
|
198 return isJobInQstat
|
|
199
|
|
200
|
|
201 ## Wait job finished status from a job group.
|
|
202 # Job are re-launched if error (max. 3 times)
|
|
203 #
|
|
204 # @param tableName string table name to record the jobs
|
|
205 # @param groupid string a group identifier to record related job series
|
|
206 # @param checkInterval integer time laps in seconds between two checks (default = 5)
|
|
207 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
|
|
208 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
|
|
209 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
|
|
210 #
|
|
211 def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
|
|
212 iTJA = TableJobAdaptatorFactory.createInstance(self, tableName)
|
|
213 iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob)
|
|
214
|
|
215 ## Submit a job to a queue and record it in job table.
|
|
216 #
|
|
217 # @param job a job instance
|
|
218 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
|
|
219 # @param checkInterval integer time laps in seconds between two checks (default = 30)
|
|
220 # @param verbose integer (default = 0)
|
|
221 #
|
|
222 def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ):
|
|
223 iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename)
|
|
224 return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval)
|
|
225
|
|
226
|
|
227 ## Get the list of nodes where jobs of one group were executed
|
|
228 #
|
|
229 # @param tablename string table name where jobs are recored
|
|
230 # @param groupid string a group identifier of job series
|
|
231 # @return lNodes list of nodes names
|
|
232 #
|
|
233 def getNodesListByGroupId( self, tableName, groupId ):
|
|
234 qry = "SELECT node FROM %s" % tableName
|
|
235 qry += " WHERE groupid='%s'" % groupId
|
|
236 self.execute( qry )
|
|
237 res = self.fetchall()
|
|
238 lNodes = []
|
|
239 for resTuple in res:
|
|
240 lNodes.append(resTuple[0])
|
|
241 return lNodes
|
|
242
|
|
243 def getDbName(self):
|
|
244 return "DbMySql"
|
|
245
|
|
246 def _getJobidAndNbJob(self, jobid) :
|
|
247 tab = []
|
|
248 tab = jobid.split(".")
|
|
249 jobid = tab[0]
|
|
250 tab = tab[1].split(":")
|
|
251 nbJob = tab[0]
|
|
252 return jobid, nbJob
|