Mercurial > repos > yufei-luo > s_mart
comparison commons/core/sql/JobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
5:ea3082881bf8 | 6:769e306b7933 |
---|---|
1 # Copyright INRA (Institut National de la Recherche Agronomique) | |
2 # http://www.inra.fr | |
3 # http://urgi.versailles.inra.fr | |
4 # | |
5 # This software is governed by the CeCILL license under French law and | |
6 # abiding by the rules of distribution of free software. You can use, | |
7 # modify and/ or redistribute the software under the terms of the CeCILL | |
8 # license as circulated by CEA, CNRS and INRIA at the following URL | |
9 # "http://www.cecill.info". | |
10 # | |
11 # As a counterpart to the access to the source code and rights to copy, | |
12 # modify and redistribute granted by the license, users are provided only | |
13 # with a limited warranty and the software's author, the holder of the | |
14 # economic rights, and the successive licensors have only limited | |
15 # liability. | |
16 # | |
17 # In this respect, the user's attention is drawn to the risks associated | |
18 # with loading, using, modifying and/or developing or reproducing the | |
19 # software by the user in light of its specific status of free software, | |
20 # that may mean that it is complicated to manipulate, and that also | |
21 # therefore means that it is reserved for developers and experienced | |
22 # professionals having in-depth computer knowledge. Users are therefore | |
23 # encouraged to load and test the software's suitability as regards their | |
24 # requirements in conditions enabling the security of their systems and/or | |
25 # data to be ensured and, more generally, to use and operate it in the | |
26 # same conditions as regards security. | |
27 # | |
28 # The fact that you are presently reading this means that you have had | |
29 # knowledge of the CeCILL license and that you accept its terms. | |
30 | |
31 | |
32 import os | |
33 import time | |
34 import sys | |
35 import tempfile | |
36 import subprocess | |
37 from commons.core.sql.Job import Job | |
38 | |
39 ## Methods for Job persistence | |
40 # | |
41 class JobAdaptator(object): | |
42 | |
43 def __init__(self, lJob = [], table = "" ): | |
44 self._lJobID = lJob | |
45 self._table = table | |
46 self._acronym = "" | |
47 ## Record a job | |
48 # | |
49 # @param job Job instance with the job informations | |
50 # | |
51 def recordJob(self, job): | |
52 self._lJobID.append(job) | |
53 | |
54 ## Remove a job from the job table | |
55 # | |
56 # @param job: job instance to remove | |
57 # | |
58 def removeJob(self, job): | |
59 pass | |
60 | |
61 ## Set the jobid of a job with the id of SGE | |
62 # | |
63 # @param job job instance | |
64 # @param jobid integer | |
65 # | |
66 def updateJobIdInDB(self, job, jobid): | |
67 pass | |
68 | |
69 ## Get a job status | |
70 # | |
71 # @param job: a Job instance with the job informations | |
72 # | |
73 def getJobStatus(self, job): | |
74 pass | |
75 | |
76 | |
77 ## Change a job status | |
78 # | |
79 # @param job: a Job instance with the job informations | |
80 # @param status: the new status (waiting,finished,error) | |
81 # | |
82 def changeJobStatus(self, job, status): | |
83 pass | |
84 | |
85 ## Get the number of jobs belonging to the desired groupid with the desired status. | |
86 # | |
87 # @param groupid string a group identifier to record related job series | |
88 # @param status string job status (waiting, running, finished, error) | |
89 # @return int | |
90 # | |
91 def getCountStatus(self, groupid, status): | |
92 pass | |
93 | |
94 ## Clean all job from a job group | |
95 # | |
96 # @param groupid: a group identifier to record related job series | |
97 # | |
98 def cleanJobGroup(self, groupid): | |
99 pass | |
100 | |
101 ## Check if there is unfinished job from a job group. | |
102 # | |
103 # @param groupid string a group identifier to record related job series | |
104 # | |
105 def hasUnfinishedJob(self, groupid): | |
106 pass | |
107 | |
108 def _getJobIDListFromQstat(self): | |
109 lJobIDFromQstat = [] | |
110 tmp = tempfile.NamedTemporaryFile(delete=False) | |
111 cmd ="qstat | grep %s" % self._acronym | |
112 process = subprocess.Popen(cmd, shell=True,stdout=tmp) | |
113 process.communicate() | |
114 tmp.close() | |
115 if process.returncode == 0: | |
116 fileName = tmp.name | |
117 jobidFileHandler = open(fileName, "r") | |
118 for line in jobidFileHandler: | |
119 line2 = line.lstrip(" ") | |
120 lJobIDFromQstat.append(line2.split(" ")[0]) | |
121 jobidFileHandler.close() | |
122 os.remove(fileName) | |
123 return lJobIDFromQstat | |
124 | |
125 def _areJobsStillRunning(self,lJobID,lJobIDFromQstat): | |
126 sorted(lJobID) | |
127 sorted(lJobIDFromQstat) | |
128 for i in lJobID: | |
129 for j in lJobIDFromQstat: | |
130 if int(i)== int(j): | |
131 return True | |
132 return False | |
133 | |
134 | |
135 ## Wait job finished status from a job group. | |
136 # Job are re-launched if error (max. 3 times) | |
137 # | |
138 # @param groupid string a group identifier to record related job series | |
139 # @param checkInterval integer time laps in seconds between two checks (default = 5) | |
140 # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) | |
141 # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) | |
142 # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) | |
143 # | |
144 def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): | |
145 | |
146 while True: | |
147 time.sleep(checkInterval) | |
148 lJobIDFromQstat = self._getJobIDListFromQstat() | |
149 if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False: | |
150 break | |
151 | |
152 ## Submit a job to a queue and record it in job table. | |
153 # | |
154 # @param job a job instance | |
155 # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) | |
156 # @param checkInterval integer time laps in seconds between two checks (default = 30) | |
157 # @param verbose integer (default = 0) | |
158 # | |
159 def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30): | |
160 cmd = self._getQsubCommand(job) | |
161 tmp = tempfile.NamedTemporaryFile(delete=False) | |
162 process = subprocess.Popen(cmd, shell=True,stdout=tmp) | |
163 process.communicate() | |
164 tmp.close() | |
165 if process.returncode == 0: | |
166 fileName = tmp.name | |
167 jobidFileHandler = open(fileName, "r") | |
168 jobid = self._getJobidFromJobManager(jobidFileHandler) | |
169 if verbose > 0: | |
170 print "job '%i %s' submitted" % (jobid, job.jobname) | |
171 sys.stdout.flush() | |
172 job.jobid = jobid | |
173 #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment) | |
174 self._acronym = job.jobname.split("_")[0][:10] | |
175 self.recordJob(job.jobid) | |
176 jobidFileHandler.close() | |
177 os.remove(fileName) | |
178 return process.returncode | |
179 | |
180 | |
181 ## Get the list of nodes where jobs of one group were executed | |
182 # | |
183 # @param groupid string a group identifier of job series | |
184 # @return lNodes list of nodes names without redundancy | |
185 # | |
186 def getNodesListByGroupId(self, groupId): | |
187 pass | |
188 | |
189 def checkJobTable(self): | |
190 pass | |
191 | |
192 def close(self): | |
193 pass | |
194 | |
195 def _getJobidAndNbJob(self, jobid) : | |
196 tab = jobid.split(".") | |
197 jobid = tab[0] | |
198 tab = tab[1].split(":") | |
199 nbJob = tab[0] | |
200 return jobid, nbJob | |
201 | |
202 class JobAdaptatorSGE(JobAdaptator): | |
203 | |
204 ## Check if a job is still handled by SGE | |
205 # | |
206 # @param jobid string job identifier | |
207 # @param jobname string job name | |
208 # | |
209 def isJobStillHandledBySge(self, jobid, jobname): | |
210 isJobInQstat = False | |
211 tmp = tempfile.NamedTemporaryFile(delete=False) | |
212 cmd = "qstat" | |
213 process = subprocess.Popen(cmd, shell=True,stdout=tmp) | |
214 process.communicate() | |
215 tmp.close() | |
216 qstatFile = tmp.name | |
217 if process.returncode != 0: | |
218 msg = "ERROR while launching 'qstat'" | |
219 sys.stderr.write( "%s\n" % msg ) | |
220 sys.exit(1) | |
221 qstatFileHandler = open(qstatFile, "r") | |
222 lLines = qstatFileHandler.readlines() | |
223 for line in lLines: | |
224 tokens = line.split() | |
225 if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: | |
226 isJobInQstat = True | |
227 break | |
228 qstatFileHandler.close() | |
229 os.remove(qstatFile) | |
230 return isJobInQstat | |
231 | |
232 def _getQsubCommand(self, job): | |
233 cmd = "echo '%s' | " % job.launcher | |
234 prg = "qsub" | |
235 cmd += prg | |
236 cmd += " -V" | |
237 cmd += " -N %s" % job.jobname | |
238 if job.queue != "": | |
239 cmd += " -q %s" % job.queue | |
240 cmd += " -cwd" | |
241 if job.lResources != []: | |
242 cmd += " -l \"" | |
243 cmd += " ".join(job.lResources) | |
244 cmd += "\"" | |
245 if job.parallelEnvironment != "": | |
246 cmd += " -pe " + job.parallelEnvironment | |
247 return cmd | |
248 | |
249 def _getJobidFromJobManager(self, jobidFileHandler): | |
250 return int(jobidFileHandler.readline().split(" ")[2]) | |
251 | |
252 | |
253 class JobAdaptatorTorque(JobAdaptator): | |
254 | |
255 def _getQsubCommand(self, job): | |
256 cmd = "echo '%s' | " % job.launcher | |
257 prg = "qsub" | |
258 cmd += prg | |
259 cmd += " -V" | |
260 cmd += " -d %s" % os.getcwd() | |
261 cmd += " -N %s" % job.jobname | |
262 if job.queue != "": | |
263 cmd += " -q %s" % job.queue | |
264 if job.lResources != []: | |
265 cmd += " -l \"" | |
266 cmd += " ".join(job.lResources).replace("mem_free","mem") | |
267 cmd += "\"" | |
268 return cmd | |
269 | |
270 def _getJobidFromJobManager(self, jobidFileHandler): | |
271 return int(jobidFileHandler.readline().split(".")[0]) |