Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/RepetJob.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
2 # http://www.inra.fr | |
3 # http://urgi.versailles.inra.fr | |
4 # | |
5 # This software is governed by the CeCILL license under French law and | |
6 # abiding by the rules of distribution of free software. You can use, | |
7 # modify and/ or redistribute the software under the terms of the CeCILL | |
8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
9 # "http://www.cecill.info". | |
10 # | |
11 # As a counterpart to the access to the source code and rights to copy, | |
12 # modify and redistribute granted by the license, users are provided only | |
13 # with a limited warranty and the software's author, the holder of the | |
14 # economic rights, and the successive licensors have only limited | |
15 # liability. | |
16 # | |
17 # In this respect, the user's attention is drawn to the risks associated | |
18 # with loading, using, modifying and/or developing or reproducing the | |
19 # software by the user in light of its specific status of free software, | |
20 # that may mean that it is complicated to manipulate, and that also | |
21 # therefore means that it is reserved for developers and experienced | |
22 # professionals having in-depth computer knowledge. Users are therefore | |
23 # encouraged to load and test the software's suitability as regards their | |
24 # requirements in conditions enabling the security of their systems and/or | |
25 # data to be ensured and, more generally, to use and operate it in the | |
26 # same conditions as regards security. | |
27 # | |
28 # The fact that you are presently reading this means that you have had | |
29 # knowledge of the CeCILL license and that you accept its terms. | |
30 | |
31 | |
32 import os | |
33 import time | |
34 import sys | |
35 from commons.core.sql.DbMySql import DbMySql | |
36 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
37 | |
38 #TODO: to remove... => replace all RepetJob() by TableJobAdaptator()... | |
39 ## Methods for Job persistence | |
40 # | |
41 class RepetJob( DbMySql ): | |
42 | |
43 | |
44 ## Record a job | |
45 # | |
46 # @param job Job instance with the job informations | |
47 # | |
48 def recordJob( self, job ): | |
49 self.removeJob( job ) | |
50 sqlCmd = "INSERT INTO %s" % ( job.tablename ) | |
51 sqlCmd += " VALUES (" | |
52 sqlCmd += " \"%s\"," % ( job.jobid ) | |
53 sqlCmd += " \"%s\"," % ( job.jobname ) | |
54 sqlCmd += " \"%s\"," % ( job.groupid ) | |
55 sqlCmd += " \"%s\"," % ( job.command.replace("\"","\'") ) | |
56 sqlCmd += " \"%s\"," % ( job.launcher ) | |
57 sqlCmd += " \"%s\"," % ( job.queue ) | |
58 sqlCmd += " \"waiting\"," | |
59 sqlCmd += " \"%s\"," % ( time.strftime( "%Y-%m-%d %H:%M:%S" ) ) | |
60 sqlCmd += " \"?\" );" | |
61 self.execute( sqlCmd ) | |
62 | |
63 | |
64 ## Remove a job from the job table | |
65 # | |
66 # @param job: job instance to remove | |
67 # | |
68 def removeJob( self, job ): | |
69 qry = "DELETE FROM %s" % ( job.tablename ) | |
70 qry += " WHERE groupid='%s'" % ( job.groupid ) | |
71 qry += " AND jobname='%s'" % ( job.jobname ) | |
72 qry += " AND queue='%s';" % ( job.queue ) | |
73 self.execute( qry ) | |
74 | |
75 | |
76 ## Set the jobid of a job with the id of SGE | |
77 # | |
78 # @param job job instance | |
79 # @param jobid integer | |
80 # | |
81 def setJobIdFromSge( self, job, jobid ): | |
82 qry = "UPDATE %s" % ( job.tablename ) | |
83 qry += " SET jobid='%i'" % ( int(jobid) ) | |
84 qry += " WHERE jobname='%s'" % ( job.jobname ) | |
85 qry += " AND groupid='%s'" % ( job.groupid ) | |
86 qry += " AND queue='%s';" % ( job.queue ) | |
87 self.execute( qry ) | |
88 | |
89 | |
90 ## Get a job status | |
91 # | |
92 # @param job: a Job instance with the job informations | |
93 # | |
94 def getJobStatus( self, job ): | |
95 if job.jobid != 0 and job.jobname == "": | |
96 job.jobname = job.jobid | |
97 job.jobid = 0 | |
98 qry = "SELECT status FROM %s" % ( job.tablename ) | |
99 qry += " WHERE groupid='%s'" % ( job.groupid ) | |
100 qry += " AND jobname='%s'" % ( job.jobname ) | |
101 qry += " AND queue='%s';" % ( job.queue ) | |
102 self.execute( qry ) | |
103 res = self.fetchall() | |
104 if len(res) > 1: | |
105 msg = "ERROR while getting job status: non-unique jobs" | |
106 sys.stderr.write( "%s\n" % msg ) | |
107 sys.stderr.flush() | |
108 sys.exit(1) | |
109 if res == None or len(res) == 0: | |
110 return "unknown" | |
111 return res[0][0] | |
112 | |
113 | |
114 ## Change a job status | |
115 # | |
116 # @param job: a Job instance with the job informations | |
117 # @param status: the new status (waiting,finished,error) | |
118 # @param method: db or file | |
119 # | |
120 def changeJobStatus( self, job, status, method=""): | |
121 sqlCmd = "UPDATE %s" % ( job.tablename ) | |
122 sqlCmd += " SET status='%s'" % ( status ) | |
123 sqlCmd += ",node='%s'" % ( job.node ) | |
124 sqlCmd += " WHERE groupid='%s'" % ( job.groupid ) | |
125 sqlCmd += " AND jobname='%s'" % ( job.jobname ) | |
126 sqlCmd += " AND queue='%s';" % ( job.queue ) | |
127 self.execute( sqlCmd ) | |
128 | |
129 | |
130 ## Get the number of jobs belonging to the desired groupid with the desired status. | |
131 # | |
132 # @param tablename string table name to record the jobs | |
133 # @param groupid string a group identifier to record related job series | |
134 # @param status string job status (waiting, running, finished, error) | |
135 # @return int | |
136 # | |
137 def getCountStatus( self, tablename, groupid, status ): | |
138 qry = "SELECT count(jobname) FROM %s" % ( tablename ) | |
139 qry += " WHERE groupid='%s'" % ( groupid ) | |
140 qry += " AND status='%s';" % ( status ) | |
141 self.execute( qry ) | |
142 res = self.fetchall() | |
143 return int( res[0][0] ) | |
144 | |
145 | |
146 ## Clean all job from a job group | |
147 # | |
148 # @param tablename table name to record the jobs | |
149 # @param groupid: a group identifier to record related job series | |
150 # | |
151 def cleanJobGroup( self, tablename, groupid ): | |
152 if self.doesTableExist( tablename ): | |
153 qry = "DELETE FROM %s WHERE groupid='%s';" % ( tablename, groupid ) | |
154 self.execute( qry ) | |
155 | |
156 | |
157 ## Check if there is unfinished job from a job group. | |
158 # | |
159 # @param tablename string table name to record the jobs | |
160 # @param groupid string a group identifier to record related job series | |
161 # | |
162 def hasUnfinishedJob( self, tablename, groupid ): | |
163 if not self.doesTableExist( tablename ): | |
164 return False | |
165 qry = "SELECT * FROM %s" % ( tablename ) | |
166 qry += " WHERE groupid='%s'" % ( groupid ) | |
167 qry += " and status!='finished';" | |
168 self.execute( qry ) | |
169 res = self.fetchall() | |
170 if len(res) == 0: | |
171 return False | |
172 return True | |
173 | |
174 | |
175 ## Check if a job is still handled by SGE | |
176 # | |
177 # @param jobid string job identifier | |
178 # @param jobname string job name | |
179 # | |
180 def isJobStillHandledBySge( self, jobid, jobname ): | |
181 isJobInQstat = False | |
182 qstatFile = "qstat_stdout" | |
183 cmd = "qstat > %s" % ( qstatFile ) | |
184 returnStatus = os.system( cmd ) | |
185 if returnStatus != 0: | |
186 msg = "ERROR while launching 'qstat'" | |
187 sys.stderr.write( "%s\n" % msg ) | |
188 sys.exit(1) | |
189 qstatFileHandler = open( qstatFile, "r" ) | |
190 lLines = qstatFileHandler.readlines() | |
191 for line in lLines: | |
192 tokens = line.split() | |
193 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: | |
194 isJobInQstat = True | |
195 break | |
196 qstatFileHandler.close() | |
197 os.remove( qstatFile ) | |
198 return isJobInQstat | |
199 | |
200 | |
201 ## Wait job finished status from a job group. | |
202 # Job are re-launched if error (max. 3 times) | |
203 # | |
204 # @param tableName string table name to record the jobs | |
205 # @param groupid string a group identifier to record related job series | |
206 # @param checkInterval integer time laps in seconds between two checks (default = 5) | |
207 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) | |
208 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) | |
209 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) | |
210 # | |
211 def waitJobGroup(self, tableName, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): | |
212 iTJA = TableJobAdaptatorFactory.createInstance(self, tableName) | |
213 iTJA.waitJobGroup(groupid, checkInterval, maxRelaunch, exitIfTooManyErrors, timeOutPerJob) | |
214 | |
215 ## Submit a job to a queue and record it in job table. | |
216 # | |
217 # @param job a job instance | |
218 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) | |
219 # @param checkInterval integer time laps in seconds between two checks (default = 30) | |
220 # @param verbose integer (default = 0) | |
221 # | |
222 def submitJob( self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30 ): | |
223 iTJA = TableJobAdaptatorFactory.createInstance(self, job.tablename) | |
224 return iTJA.submitJob(job, verbose, maxNbWaitingJobs, checkInterval) | |
225 | |
226 | |
227 ## Get the list of nodes where jobs of one group were executed | |
228 # | |
229 # @param tablename string table name where jobs are recored | |
230 # @param groupid string a group identifier of job series | |
231 # @return lNodes list of nodes names | |
232 # | |
233 def getNodesListByGroupId( self, tableName, groupId ): | |
234 qry = "SELECT node FROM %s" % tableName | |
235 qry += " WHERE groupid='%s'" % groupId | |
236 self.execute( qry ) | |
237 res = self.fetchall() | |
238 lNodes = [] | |
239 for resTuple in res: | |
240 lNodes.append(resTuple[0]) | |
241 return lNodes | |
242 | |
243 def getDbName(self): | |
244 return "DbMySql" | |
245 | |
246 def _getJobidAndNbJob(self, jobid) : | |
247 tab = [] | |
248 tab = jobid.split(".") | |
249 jobid = tab[0] | |
250 tab = tab[1].split(":") | |
251 nbJob = tab[0] | |
252 return jobid, nbJob |