18
|
1 from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
|
|
2 from commons.core.stat.Stat import Stat
|
|
3 from commons.core.launcher.WriteScript import WriteScript
|
|
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
5 from commons.core.sql.Job import Job
|
|
6 import stat
|
|
7 import os
|
|
8 import re
|
|
9 import sys
|
|
10 import time
|
|
11 import glob
|
|
12
|
|
13 class LauncherParameter(object):
|
|
14
|
|
15 def __init__(self, jobDB):
|
|
16 self._jobDB = jobDB
|
|
17
|
|
18 def getJobDB(self):
|
|
19 return self._jobDB
|
|
20
|
|
21 def setQuery(self, query):
|
|
22 self._query = query
|
|
23
|
|
24 def setSubject(self, subject):
|
|
25 self._subject = subject
|
|
26
|
|
27 def setParam(self, param):
|
|
28 self._param = param
|
|
29
|
|
30 def setCurrentDir(self, currentDir):
|
|
31 self._currentDir = currentDir
|
|
32
|
|
33 def getCurrentDir(self):
|
|
34 return self._currentDir
|
|
35
|
|
36 def setTempDir(self, tempDir):
|
|
37 self._tempDir = tempDir
|
|
38
|
|
39 def getTempDir(self):
|
|
40 return self._tempDir
|
|
41
|
|
42 def setJobTable(self, jobTable):
|
|
43 self._jobTable = jobTable
|
|
44
|
|
45 def setQueue(self, queue):
|
|
46 self._queue = queue
|
|
47
|
|
48 def getQueue(self):
|
|
49 return self._queue
|
|
50
|
|
51 def setGroupId(self, groupId):
|
|
52 self._groupId = groupId
|
|
53
|
|
54 def getGroupId(self):
|
|
55 return self._groupId
|
|
56
|
|
57 def setAcronym(self, acronym):
|
|
58 self._acronym = acronym
|
|
59
|
|
60 def getAcronym(self):
|
|
61 return self._acronym
|
|
62
|
|
63 @staticmethod
|
|
64 def createParameter(jobdb, groupid, acronym):
|
|
65 launcherParameter = LauncherParameter(jobdb)
|
|
66 launcherParameter.setQuery(os.getcwd())
|
|
67 launcherParameter.setSubject("")
|
|
68 launcherParameter.setParam("")
|
|
69 launcherParameter.setCurrentDir(os.getcwd())
|
|
70 launcherParameter.setTempDir(os.getcwd())
|
|
71 launcherParameter.setJobTable("")
|
|
72 launcherParameter.setQueue("")
|
|
73 launcherParameter.setGroupId(groupid)
|
|
74 launcherParameter.setAcronym(acronym)
|
|
75 return launcherParameter
|
|
76
|
|
77
|
|
78 class Launcher2(object):
|
|
79
|
|
80 #TODO: remove unused parameters : query="", subject="", param="", job_table=""
|
|
81 def __init__(self, iLauncherParameter):
|
|
82 jobdb = iLauncherParameter.getJobDB()
|
|
83 cdir = iLauncherParameter.getCurrentDir()
|
|
84 if jobdb.__class__.__name__ == "RepetJob":
|
|
85 self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
|
|
86 else:
|
|
87 self.jobdb = jobdb
|
|
88 self.jobdb.checkJobTable()
|
|
89 if cdir == "":
|
|
90 cdir = os.getcwd()
|
|
91 self.cdir = cdir
|
|
92 self.tmpdir = iLauncherParameter.getTempDir()
|
|
93 self.groupid = iLauncherParameter.getGroupId()
|
|
94 self.acronyme = iLauncherParameter.getAcronym()
|
|
95 self._chooseTemplateWithCopy = False
|
|
96 self._chooseTemplateLight = False
|
|
97 self.queue, self.lResources = self.getQueueNameAndResources(iLauncherParameter.getQueue())
|
|
98 self._createJobInstance()
|
|
99 self._nbJobs = 0
|
|
100
|
|
101 def getQueueNameAndResources(self, configQueue):
|
|
102 tokens = configQueue.replace("'","").split(" ")
|
|
103 queueName = ""
|
|
104 lResources = []
|
|
105 if tokens[0] != "":
|
|
106 if re.match(".*\.q", tokens[0]):
|
|
107 queueName = tokens[0]
|
|
108 lResources = tokens[1:]
|
|
109 else:
|
|
110 lResources = tokens
|
|
111 return queueName, lResources
|
|
112
|
|
113 def createGroupidIfItNotExist(self):
|
|
114 if self.groupid == "":
|
|
115 self.job.groupid = str(os.getpid())
|
|
116 else:
|
|
117 self.job.groupid = self.groupid
|
|
118
|
|
119 def beginRun( self ):
|
|
120 self.createGroupidIfItNotExist()
|
|
121 if self.jobdb.hasUnfinishedJob(self.job.groupid):
|
|
122 self.jobdb.waitJobGroup(self.job.groupid)
|
|
123 else:
|
|
124 self.jobdb.cleanJobGroup(self.job.groupid)
|
|
125
|
|
126 ## Launch one job in parallel
|
|
127 #
|
|
128 # @param cmdStart string command-line for the job to be launched
|
|
129 # @param cmdFinish string command to retrieve result files
|
|
130 # @warning the jobname has to be defined outside from this method
|
|
131 #
|
|
132 def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
|
|
133 if self._nbJobs == 0:
|
|
134 self._nbJobs = 1
|
|
135 pid = str(os.getpid())
|
|
136 now = time.localtime()
|
|
137 #TODO: rename ClusterLauncher_ ...
|
|
138 pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
|
|
139 self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
|
|
140 "-" + str(now[2]) + "_" + pid + ".py"
|
|
141 self.job.launcher = pyFileName
|
|
142
|
|
143 #TODO: to remove when refactoring is done
|
|
144 cmdStart = self._indentCmd(cmdStart)
|
|
145 cmdFinish = self._indentCmd(cmdFinish)
|
|
146
|
|
147 iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
|
|
148 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
|
|
149 os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
|
|
150 sys.stdout.flush()
|
|
151 log = self.jobdb.submitJob(self.job)
|
|
152 if log != 0:
|
|
153 print "ERROR while submitting job to the cluster"
|
|
154 sys.exit(1)
|
|
155
|
|
156 def endRun(self, cleanNodes = False):
|
|
157 string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
158 print string; sys.stdout.flush()
|
|
159 self.jobdb.waitJobGroup(self.job.groupid)
|
|
160 if self._nbJobs > 1:
|
|
161 string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
162 print string; sys.stdout.flush()
|
|
163
|
|
164 if cleanNodes:
|
|
165 string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
166 print string; sys.stdout.flush()
|
|
167 self.cleanNodes()
|
|
168 string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
169 print string; sys.stdout.flush()
|
|
170
|
|
171 statsExecutionTime = self.getStatsOfExecutionTime()
|
|
172 if self._nbJobs > 1:
|
|
173 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
|
|
174 print "execution time per job: %s" % statsExecutionTime.string()
|
|
175 sys.stdout.flush()
|
|
176 self.jobdb.cleanJobGroup(self.job.groupid)
|
|
177
|
|
178 def getStatsOfExecutionTime(self, acronyme = ""):
|
|
179 stat = Stat()
|
|
180 if acronyme == "":
|
|
181 pattern = "%s*.o*" % self.acronyme
|
|
182 else:
|
|
183 pattern = "%s*.o*" % acronyme
|
|
184 lJobFiles = glob.glob(pattern)
|
|
185 for f in lJobFiles:
|
|
186 fH = open(f, "r")
|
|
187 while True:
|
|
188 line = fH.readline()
|
|
189 if line == "":
|
|
190 break
|
|
191 if "executionTime" in line:
|
|
192 stat.add( float(line[:-1].split("=")[1] ) )
|
|
193 break
|
|
194 fH.close()
|
|
195 return stat
|
|
196
|
|
197 def clean( self, acronyme = "", stdout = True, stderr = True ):
|
|
198 lFileToRemove = []
|
|
199 if acronyme == "":
|
|
200 acronyme = self.acronyme
|
|
201 pattern = "ClusterLauncher*%s*.py" % ( acronyme )
|
|
202 lFileToRemove.extend(glob.glob( pattern ))
|
|
203 if stdout:
|
|
204 pattern = "%s*.o*" % ( acronyme )
|
|
205 lFileToRemove.extend(glob.glob( pattern ))
|
|
206 if stderr:
|
|
207 pattern = "%s*.e*" % ( acronyme )
|
|
208 lFileToRemove.extend(glob.glob( pattern ))
|
|
209 for file in lFileToRemove:
|
|
210 os.remove(file)
|
|
211
|
|
212 #TODO: handle of nodesMustBeCleaned => class attribute ?
|
|
213 def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
|
|
214 self.beginRun()
|
|
215 print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
216 for cmdsTuple in lCmdsTuples:
|
|
217 self._nbJobs += 1
|
|
218 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
|
|
219 self.job.jobname = self.acronyme
|
|
220 if len(cmdsTuple) == 2:
|
|
221 self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
|
|
222 else:
|
|
223 self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
|
|
224 self._createJobInstance()
|
|
225 self.createGroupidIfItNotExist()
|
|
226 self.acronyme = acronymPrefix
|
|
227 self.endRun(nodesMustBeCleaned)
|
|
228 if cleanMustBeDone:
|
|
229 self.clean("%s_" % acronymPrefix)
|
|
230 self.jobdb.close()
|
|
231
|
|
232 def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
|
|
233 cmdStart = ""
|
|
234 for cmd in lCmdStart:
|
|
235 cmdStart += "%s\n\t" % cmd
|
|
236 for cmd in lCmds:
|
|
237 cmdStart += "%s\n\t" % cmd
|
|
238 cmdFinish = ""
|
|
239 for cmd in lCmdFinish:
|
|
240 cmdFinish += "%s\n\t" % cmd
|
|
241 cmdSize = ""
|
|
242 for cmd in lCmdSize:
|
|
243 cmdSize += "%s\n\t\t" % cmd
|
|
244 cmdCopy = ""
|
|
245 for cmd in lCmdCopy:
|
|
246 cmdCopy += "%s\n\t\t" % cmd
|
|
247 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
|
|
248
|
|
249 #TODO: to remove when refactoring is done
|
|
250 def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
|
|
251 cmdStart = ""
|
|
252 for cmd in lCmdStart:
|
|
253 cmdStart += "%s\n" % cmd
|
|
254 for cmd in lCmds:
|
|
255 cmdStart += "%s\n" % cmd
|
|
256 cmdFinish = ""
|
|
257 for cmd in lCmdFinish:
|
|
258 cmdFinish += "%s\n" % cmd
|
|
259 cmdSize = ""
|
|
260 for cmd in lCmdSize:
|
|
261 cmdSize += "%s\n\t\t" % cmd
|
|
262 cmdCopy = ""
|
|
263 for cmd in lCmdCopy:
|
|
264 cmdCopy += "%s\n\t\t" % cmd
|
|
265 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
|
|
266
|
|
267 def getSystemCommand(self, prg, lArgs):
|
|
268 systemCmd = "log = os.system(\"" + prg
|
|
269 for arg in lArgs:
|
|
270 systemCmd += " " + arg
|
|
271 systemCmd += "\")"
|
|
272 return systemCmd
|
|
273
|
|
274 def cleanNodes(self):
|
|
275 iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
|
|
276 iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
|
|
277 iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
|
|
278 iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
|
|
279 iCleanClusterNodeAfterRepet.run()
|
|
280
|
|
281 #TODO: to remove when refactoring is done
|
|
282 def _indentCmd(self, cmd):
|
|
283 lCmd = cmd.split("\n")
|
|
284 cmd_Tab = "%s\n" % lCmd[0]
|
|
285 for line in lCmd[1:-1]:
|
|
286 cmd_Tab += "\t%s\n" % line
|
|
287 return cmd_Tab
|
|
288
|
|
289 def _createJobInstance(self):
|
|
290 if self.lResources == []:
|
|
291 #To have mem_free=1G:
|
|
292 self.job = Job(queue=self.queue)
|
|
293 else:
|
|
294 self.job = Job(queue=self.queue, lResources=self.lResources)
|