6
|
1 from commons.tools.CleanClusterNodesAfterRepet import CleanClusterNodesAfterRepet
|
|
2 from commons.core.stat.Stat import Stat
|
|
3 from commons.core.launcher.WriteScript import WriteScript
|
|
4 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
|
|
5 from commons.core.sql.Job import Job
|
|
6 import stat
|
|
7 import os
|
|
8 import re
|
|
9 import sys
|
|
10 import time
|
|
11 import glob
|
|
12
|
|
13 class Launcher(object):
|
|
14
|
|
15 #TODO: remove unused parameters : query="", subject="", param="", job_table=""
|
|
16 def __init__( self, jobdb, query="", subject="", param="", cdir="",
|
|
17 tmpdir="", job_table="", queue="", groupid="", acro="X",
|
|
18 chooseTemplateWithCopy = False, chooseTemplateLight = False):
|
|
19 if jobdb.__class__.__name__ == "RepetJob":
|
|
20 self.jobdb = TableJobAdaptatorFactory.createInstance(jobdb, "jobs")
|
|
21 else:
|
|
22 self.jobdb = jobdb
|
|
23 self.jobdb.checkJobTable()
|
|
24 if cdir == "":
|
|
25 cdir = os.getcwd()
|
|
26 self.cdir = cdir
|
|
27 self.tmpdir = tmpdir
|
|
28 self.groupid = groupid
|
|
29 self.acronyme = acro
|
|
30 self._chooseTemplateWithCopy = chooseTemplateWithCopy
|
|
31 self._chooseTemplateLight = chooseTemplateLight
|
|
32 self.queue, self.lResources = self.getQueueNameAndResources(queue)
|
|
33 self._createJobInstance()
|
|
34 self._nbJobs = 0
|
|
35
|
|
36 def getQueueNameAndResources(self, configQueue):
|
|
37 tokens = configQueue.replace("'","").split(" ")
|
|
38 queueName = ""
|
|
39 lResources = []
|
|
40 if tokens[0] != "":
|
|
41 if re.match(".*\.q", tokens[0]):
|
|
42 queueName = tokens[0]
|
|
43 lResources = tokens[1:]
|
|
44 else:
|
|
45 lResources = tokens
|
|
46 return queueName, lResources
|
|
47
|
|
48 def createGroupidIfItNotExist(self):
|
|
49 if self.groupid == "":
|
|
50 self.job.groupid = str(os.getpid())
|
|
51 else:
|
|
52 self.job.groupid = self.groupid
|
|
53
|
|
54 def beginRun( self ):
|
|
55 self.createGroupidIfItNotExist()
|
|
56 if self.jobdb.hasUnfinishedJob(self.job.groupid):
|
|
57 self.jobdb.waitJobGroup(self.job.groupid)
|
|
58 else:
|
|
59 self.jobdb.cleanJobGroup(self.job.groupid)
|
|
60
|
|
61 ## Launch one job in parallel
|
|
62 #
|
|
63 # @param cmdStart string command-line for the job to be launched
|
|
64 # @param cmdFinish string command to retrieve result files
|
|
65 # @warning the jobname has to be defined outside from this method
|
|
66 #
|
|
67 def runSingleJob(self, cmdStart, cmdFinish = "", cmdSize = "", cmdCopy = ""):
|
|
68 if self._nbJobs == 0:
|
|
69 self._nbJobs = 1
|
|
70 pid = str(os.getpid())
|
|
71 now = time.localtime()
|
|
72 #TODO: rename ClusterLauncher_ ...
|
|
73 pyFileName = self.cdir + "/ClusterLauncher_" + self.job.groupid + "_" +\
|
|
74 self.job.jobname + "_" + str(now[0]) + "-" + str(now[1]) +\
|
|
75 "-" + str(now[2]) + "_" + pid + ".py"
|
|
76 self.job.launcher = pyFileName
|
|
77
|
|
78 #TODO: to remove when refactoring is done
|
|
79 cmdStart = self._indentCmd(cmdStart)
|
|
80 cmdFinish = self._indentCmd(cmdFinish)
|
|
81
|
|
82 iWriteScript = WriteScript(self.job, self.jobdb, self.cdir, self.tmpdir, self._chooseTemplateWithCopy, self._chooseTemplateLight)
|
|
83 iWriteScript.run(cmdStart, cmdFinish, pyFileName, cmdSize, cmdCopy)
|
|
84 os.chmod(pyFileName, stat.S_IRWXU+stat.S_IRGRP+stat.S_IXGRP+stat.S_IROTH+stat.S_IXOTH)
|
|
85 sys.stdout.flush()
|
|
86 log = self.jobdb.submitJob(self.job)
|
|
87 if log != 0:
|
|
88 print "ERROR while submitting job to the cluster"
|
|
89 sys.exit(1)
|
|
90
|
|
91 def endRun(self, cleanNodes = False):
|
|
92 string = "waiting for %i job(s) with groupid '%s' (%s)" % (self._nbJobs, self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
93 print string; sys.stdout.flush()
|
|
94 self.jobdb.waitJobGroup(self.job.groupid)
|
|
95 if self._nbJobs > 1:
|
|
96 string = "all jobs with groupid '%s' are finished (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
97 print string; sys.stdout.flush()
|
|
98
|
|
99 if cleanNodes:
|
|
100 string = "start cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
101 print string; sys.stdout.flush()
|
|
102 self.cleanNodes()
|
|
103 string = "end cleaning cluster nodes (%s)" % time.strftime("%Y-%m-%d %H:%M:%S")
|
|
104 print string; sys.stdout.flush()
|
|
105
|
|
106 statsExecutionTime = self.getStatsOfExecutionTime()
|
|
107 if self._nbJobs > 1:
|
|
108 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
|
|
109 print "execution time per job: %s" % statsExecutionTime.string()
|
|
110 sys.stdout.flush()
|
|
111 self.jobdb.cleanJobGroup(self.job.groupid)
|
|
112
|
|
113 def getStatsOfExecutionTime(self, acronyme = ""):
|
|
114 stat = Stat()
|
|
115 if acronyme == "":
|
|
116 pattern = "%s*.o*" % self.acronyme
|
|
117 else:
|
|
118 pattern = "%s*.o*" % acronyme
|
|
119 lJobFiles = glob.glob(pattern)
|
|
120 for f in lJobFiles:
|
|
121 fH = open(f, "r")
|
|
122 while True:
|
|
123 line = fH.readline()
|
|
124 if line == "":
|
|
125 break
|
|
126 if "executionTime" in line:
|
|
127 stat.add( float(line[:-1].split("=")[1] ) )
|
|
128 break
|
|
129 fH.close()
|
|
130 return stat
|
|
131
|
|
132 def clean( self, acronyme = "", stdout = True, stderr = True ):
|
|
133 lFileToRemove = []
|
|
134 if acronyme == "":
|
|
135 acronyme = self.acronyme
|
|
136 pattern = "ClusterLauncher*%s*.py" % ( acronyme )
|
|
137 lFileToRemove.extend(glob.glob( pattern ))
|
|
138 if stdout:
|
|
139 pattern = "%s*.o*" % ( acronyme )
|
|
140 lFileToRemove.extend(glob.glob( pattern ))
|
|
141 if stderr:
|
|
142 pattern = "%s*.e*" % ( acronyme )
|
|
143 lFileToRemove.extend(glob.glob( pattern ))
|
|
144 for file in lFileToRemove:
|
|
145 os.remove(file)
|
|
146
|
|
147 #TODO: handle of nodesMustBeCleaned => class attribute ?
|
|
148 def runLauncherForMultipleJobs(self, acronymPrefix, lCmdsTuples, cleanMustBeDone = True, nodesMustBeCleaned = False):
|
|
149 self.beginRun()
|
|
150 print "submitting job(s) with groupid '%s' (%s)" % (self.job.groupid, time.strftime("%Y-%m-%d %H:%M:%S"))
|
|
151 for cmdsTuple in lCmdsTuples:
|
|
152 self._nbJobs += 1
|
|
153 self.acronyme = "%s_%s" % (acronymPrefix, self._nbJobs)
|
|
154 self.job.jobname = self.acronyme
|
|
155 if len(cmdsTuple) == 2:
|
|
156 self.runSingleJob(cmdsTuple[0], cmdsTuple[1])
|
|
157 else:
|
|
158 self.runSingleJob(cmdsTuple[0], cmdsTuple[1], cmdsTuple[2], cmdsTuple[3])
|
|
159 self._createJobInstance()
|
|
160 self.createGroupidIfItNotExist()
|
|
161 self.acronyme = acronymPrefix
|
|
162 self.endRun(nodesMustBeCleaned)
|
|
163 if cleanMustBeDone:
|
|
164 self.clean("%s_" % acronymPrefix)
|
|
165 self.jobdb.close()
|
|
166
|
|
167 def prepareCommands(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
|
|
168 cmdStart = ""
|
|
169 for cmd in lCmdStart:
|
|
170 cmdStart += "%s\n\t" % cmd
|
|
171 for cmd in lCmds:
|
|
172 cmdStart += "%s\n\t" % cmd
|
|
173 cmdFinish = ""
|
|
174 for cmd in lCmdFinish:
|
|
175 cmdFinish += "%s\n\t" % cmd
|
|
176 cmdSize = ""
|
|
177 for cmd in lCmdSize:
|
|
178 cmdSize += "%s\n\t\t" % cmd
|
|
179 cmdCopy = ""
|
|
180 for cmd in lCmdCopy:
|
|
181 cmdCopy += "%s\n\t\t" % cmd
|
|
182 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
|
|
183
|
|
184 #TODO: to remove when refactoring is done
|
|
185 def prepareCommands_withoutIndentation(self, lCmds, lCmdStart = [], lCmdFinish = [], lCmdSize = [], lCmdCopy = []):
|
|
186 cmdStart = ""
|
|
187 for cmd in lCmdStart:
|
|
188 cmdStart += "%s\n" % cmd
|
|
189 for cmd in lCmds:
|
|
190 cmdStart += "%s\n" % cmd
|
|
191 cmdFinish = ""
|
|
192 for cmd in lCmdFinish:
|
|
193 cmdFinish += "%s\n" % cmd
|
|
194 cmdSize = ""
|
|
195 for cmd in lCmdSize:
|
|
196 cmdSize += "%s\n\t\t" % cmd
|
|
197 cmdCopy = ""
|
|
198 for cmd in lCmdCopy:
|
|
199 cmdCopy += "%s\n\t\t" % cmd
|
|
200 return (cmdStart, cmdFinish, cmdSize, cmdCopy)
|
|
201
|
|
202 def getSystemCommand(self, prg, lArgs):
|
|
203 systemCmd = "log = os.system(\"" + prg
|
|
204 for arg in lArgs:
|
|
205 systemCmd += " " + arg
|
|
206 systemCmd += "\")"
|
|
207 return systemCmd
|
|
208
|
|
209 def cleanNodes(self):
|
|
210 iCleanClusterNodeAfterRepet = CleanClusterNodesAfterRepet()
|
|
211 iCleanClusterNodeAfterRepet.setLNodes(self.jobdb.getNodesListByGroupId(self.groupid))
|
|
212 iCleanClusterNodeAfterRepet.setTempDirectory(self.tmpdir)
|
|
213 iCleanClusterNodeAfterRepet.setPattern("%s*" % self.groupid)
|
|
214 iCleanClusterNodeAfterRepet.run()
|
|
215
|
|
216 #TODO: to remove when refactoring is done
|
|
217 def _indentCmd(self, cmd):
|
|
218 lCmd = cmd.split("\n")
|
|
219 cmd_Tab = "%s\n" % lCmd[0]
|
|
220 for line in lCmd[1:-1]:
|
|
221 cmd_Tab += "\t%s\n" % line
|
|
222 return cmd_Tab
|
|
223
|
|
224 def _createJobInstance(self):
|
|
225 if self.lResources == []:
|
|
226 #To have mem_free=1G:
|
|
227 self.job = Job(queue=self.queue)
|
|
228 else:
|
|
229 self.job = Job(queue=self.queue, lResources=self.lResources)
|