Mercurial > repos > yufei-luo > s_mart
comparison commons/pyRepetUnit/components/AbstractClusterLauncher.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
17:b0e8584489e6 | 18:94ab73e8a190 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 # Copyright INRA (Institut National de la Recherche Agronomique) | |
4 # http://www.inra.fr | |
5 # http://urgi.versailles.inra.fr | |
6 # | |
7 # This software is governed by the CeCILL license under French law and | |
8 # abiding by the rules of distribution of free software. You can use, | |
9 # modify and/ or redistribute the software under the terms of the CeCILL | |
10 # license as circulated by CEA, CNRS and INRIA at the following URL | |
11 # "http://www.cecill.info". | |
12 # | |
13 # As a counterpart to the access to the source code and rights to copy, | |
14 # modify and redistribute granted by the license, users are provided only | |
15 # with a limited warranty and the software's author, the holder of the | |
16 # economic rights, and the successive licensors have only limited | |
17 # liability. | |
18 # | |
19 # In this respect, the user's attention is drawn to the risks associated | |
20 # with loading, using, modifying and/or developing or reproducing the | |
21 # software by the user in light of its specific status of free software, | |
22 # that may mean that it is complicated to manipulate, and that also | |
23 # therefore means that it is reserved for developers and experienced | |
24 # professionals having in-depth computer knowledge. Users are therefore | |
25 # encouraged to load and test the software's suitability as regards their | |
26 # requirements in conditions enabling the security of their systems and/or | |
27 # data to be ensured and, more generally, to use and operate it in the | |
28 # same conditions as regards security. | |
29 # | |
30 # The fact that you are presently reading this means that you have had | |
31 # knowledge of the CeCILL license and that you accept its terms. | |
32 | |
33 import getopt | |
34 import time | |
35 import glob | |
36 import sys | |
37 import os | |
38 | |
39 from commons.core.checker.CheckerException import CheckerException | |
40 from commons.core.sql.RepetJob import RepetJob | |
41 from commons.core.sql.Job import Job | |
42 from commons.core.stat.Stat import Stat | |
43 from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher | |
44 | |
45 GENERIC_IN_FILE = "zDUMMYz" | |
46 | |
47 | |
48 ## Abstract class to launch a program in parallel on a cluster. | |
49 # | |
50 class AbstractClusterLauncher( object ): #( IClusterLauncher ) | |
51 | |
52 def __init__( self ): | |
53 """ | |
54 Constructor. | |
55 """ | |
56 self._inputDir = "" # path to the directory with input files | |
57 self._queueName = "" # name of the queue on the cluster | |
58 self._groupId = "" # identifier of the group of jobs (groupid) | |
59 self._inputFileSuffix = "fa" # suffix of the input files (default='fa') | |
60 self._prgAcronym = "" # acronym of the program to launch | |
61 self._configFile = "" # name of the configuration file (connect to MySQL) | |
62 self._currentDir = os.getcwd() # path to the current directory | |
63 self._tmpDir = "" # path to the temporary directory | |
64 self._jobTable = "jobs" # name of the table recording the jobs | |
65 self._catOutFiles = False # concatenate output files of all jobs | |
66 self._clean = False # clean job file, job stdout, job table... | |
67 self._verbose = 1 # verbosity level | |
68 self.jobdb = None # RepetJob instance | |
69 self.job = Job() # Job instance | |
70 | |
71 self._nbJobs = 0 | |
72 self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:" | |
73 self._cmdLineSpecificOptions = "" | |
74 | |
75 self._exeWrapper = "AbstractProgramLauncher.py" | |
76 self._prgLauncher = None | |
77 # list of instances derived from AbstractProgramLauncher() | |
78 # If several program are launched successively in the same job, | |
79 # 'lPrgLaunchers' has to be filled before run(). | |
80 self.lPrgLaunchers = [] | |
81 | |
82 def setProgramLauncherAttributeFromCmdLine(self, o, a=""): | |
83 self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a) | |
84 | |
85 def setClusterLauncherAttributeFromCmdLine(self, o, a=""): | |
86 if o == "-h": | |
87 print self.getHelpAsString() | |
88 sys.exit(0) | |
89 elif o == "-i": | |
90 self.setInputDirectory(a) | |
91 elif o == "-Q": | |
92 self.setQueueName(a) | |
93 elif o == "-g": | |
94 self.setGroupIdentifier(a) | |
95 elif o == "-S": | |
96 self.setInputFileSuffix(a) | |
97 elif o == "-a": | |
98 self.setAcronym(a) | |
99 elif o == "-C": | |
100 self.setConfigFile(a) | |
101 elif o == "-d": | |
102 self.setTemporaryDirectory(a) | |
103 elif o == "-j": | |
104 self.setJobTableName(a) | |
105 elif o == "-Z": | |
106 self.setCatOutputFiles() | |
107 elif o == "-c": | |
108 self.setClean() | |
109 elif o == "-v": | |
110 self.setVerbosityLevel(a) | |
111 | |
112 def setAttributesFromCmdLine(self): | |
113 try: | |
114 opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions()) | |
115 except getopt.GetoptError, err: | |
116 print str(err); | |
117 print self.getHelpAsString() | |
118 sys.exit(1) | |
119 for o, a in opts: | |
120 self.setClusterLauncherAttributeFromCmdLine(o, a) | |
121 self.setProgramLauncherAttributeFromCmdLine(o, a) | |
122 | |
123 def setAGenericAttributeFromCmdLine( self, o, a="" ): | |
124 self.setClusterLauncherAttributeFromCmdLine(o, a) | |
125 | |
126 def setASpecificAttributeFromCmdLine( self, o, a="" ): | |
127 self.setProgramLauncherAttributeFromCmdLine(o, a) | |
128 | |
129 def setInputDirectory( self, arg ): | |
130 self._inputDir = arg | |
131 | |
132 def setQueueName( self, arg ): | |
133 self._queueName = arg | |
134 | |
135 def setGroupIdentifier( self, arg ): | |
136 self._groupId = arg | |
137 | |
138 def setInputFileSuffix( self, arg ): | |
139 self._inputFileSuffix = arg | |
140 | |
141 def setAcronym( self, arg ): | |
142 self._prgAcronym = arg | |
143 | |
144 def setConfigFile( self, arg ): | |
145 if os.path.dirname( arg ) == "": | |
146 self._configFile = "%s/%s" % ( os.getcwd(), arg ) | |
147 else: | |
148 self._configFile = arg | |
149 | |
150 def setCurrentDirectory( self, arg =os.getcwd()): | |
151 self._currentDir = arg | |
152 | |
153 def setTemporaryDirectory( self, arg ): | |
154 self._tmpDir = arg | |
155 | |
156 def setJobTableName( self, arg ): | |
157 self._jobTable = arg | |
158 | |
159 def setCatOutputFiles( self ): | |
160 self._catOutFiles = True | |
161 | |
162 def setClean( self): | |
163 self._clean = True | |
164 | |
165 def setVerbosityLevel( self, arg ): | |
166 self._verbose = int(arg) | |
167 | |
168 def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ): | |
169 self._exeWrapper = arg | |
170 | |
171 def setSingleProgramLauncher( self ): | |
172 """ | |
173 Set the wrapper and program command-lines of the program launcher. | |
174 Append the program launcher to 'self.lPrgLaunchers'. | |
175 """ | |
176 self.getProgramLauncherInstance().setWrapperCommandLine() | |
177 self.getProgramLauncherInstance().setProgramCommandLine() | |
178 self.lPrgLaunchers.append( self.getProgramLauncherInstance() ) | |
179 | |
180 def getGenericHelpAsString( self ): | |
181 string = "" | |
182 string += "usage: %s.py [options]" % (type(self).__name__ ) | |
183 string += "\ngeneric options:" | |
184 string += "\n -h: this help" | |
185 string += "\n -i: directory with input files (absolute path)" | |
186 string += "\n -Q: name of the queue on the cluster" | |
187 string += "\n -g: identifier of the group of jobs (groupid)" | |
188 string += "\n -S: suffix of the input files (default='fa')" | |
189 string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() ) | |
190 string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)" | |
191 string += "\n -d: temporary directory (absolute path, default=None)" | |
192 string += "\n -j: table recording the jobs (default='jobs')" | |
193 string += "\n -c: clean the temporary data" | |
194 string += "\n -v: verbosity level (default=0/1/2)" | |
195 return string | |
196 | |
197 def getSpecificHelpAsString( self ): | |
198 pass | |
199 | |
200 def getHelpAsString(self): | |
201 return self.getGenericHelpAsString() + self.getSpecificHelpAsString() | |
202 | |
203 def getInputDirectory( self ): | |
204 return self._inputDir | |
205 | |
206 def getQueueName( self ): | |
207 return self._queueName | |
208 | |
209 def getGroupIdentifier( self ): | |
210 return self._groupId | |
211 | |
212 def getInputFileSuffix( self ): | |
213 return self._inputFileSuffix | |
214 | |
215 def getAcronym( self ): | |
216 return self._prgAcronym | |
217 | |
218 def getConfigFile( self ): | |
219 return self._configFile | |
220 | |
221 def getCurrentDirectory( self ): | |
222 return self._currentDir | |
223 | |
224 def getTemporaryDirectory( self ): | |
225 return self._tmpDir | |
226 | |
227 def getJobTableName( self ): | |
228 return self._jobTable | |
229 | |
230 def getCatOutputFiles( self ): | |
231 return self._catOutFiles | |
232 | |
233 def getClean( self ): | |
234 return self._clean | |
235 | |
236 def getVerbosityLevel( self ): | |
237 return self._verbose | |
238 | |
239 def getWrapperName( self ): | |
240 return self.getProgramLauncherInstance().getWrapperName() | |
241 | |
242 def getProgramName( self ): | |
243 return self.getProgramLauncherInstance().getProgramName() | |
244 | |
245 def getPatternToConcatenate( self ): | |
246 return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" ) | |
247 | |
248 def getProgramLauncherInstance( self ): | |
249 if self._prgLauncher == None: | |
250 self._prgLauncher = AbstractProgramLauncher() | |
251 return self._prgLauncher | |
252 | |
253 def getInputFilesList(self): | |
254 lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix)) | |
255 return lInFiles | |
256 | |
257 def getCmdLineOptions(self): | |
258 return "hi:Q:g:S:a:C:d:j:Zcv:" | |
259 | |
260 | |
261 def getProgramCommandLineAsString( self ): | |
262 """ | |
263 Return the command-line to launch in each job. | |
264 Specified in each wrapper. | |
265 """ | |
266 pass | |
267 | |
268 | |
269 def getListFilesToKeep( self ): | |
270 """ | |
271 Return the list of files to keep at the end of each job. | |
272 Specified in each wrapper. | |
273 """ | |
274 pass | |
275 | |
276 | |
277 def getListFilesToRemove( self ): | |
278 """ | |
279 Return the list of files to remove at the end of each job. | |
280 Specified in each wrapper. | |
281 """ | |
282 pass | |
283 | |
284 | |
285 def getJobFileNameAsString( self, count ): | |
286 """ | |
287 Return the name of the job file as a string. | |
288 @param count: job number (e.g. '1') or '*' | |
289 @type count: integer or string | |
290 """ | |
291 jobFile = "ClusterLauncher" | |
292 jobFile += "_groupid%s" % ( self.getGroupIdentifier() ) | |
293 if count != "*": | |
294 jobFile += "_job%i" % ( count ) | |
295 jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") ) | |
296 else: | |
297 jobFile += "_job*" | |
298 jobFile += "_time%s-*" % ( time.strftime("%Y-%m") ) | |
299 jobFile += ".py" | |
300 return jobFile | |
301 | |
302 | |
303 def getCmdUpdateJobStatusAsString( self, newStatus ): | |
304 """ | |
305 Return the command to update the job status in the table. | |
306 """ | |
307 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" | |
308 cmd = prg | |
309 cmd += " -t %s" % ( self.job.tablename ) | |
310 if str(self.job.jobid).isdigit(): | |
311 cmd += " -j %s" % ( self.job.jobname ) | |
312 else: | |
313 cmd += " -j %s" % ( self.job.jobid ) | |
314 cmd += " -g %s" % ( self.job.groupid ) | |
315 if self.job.queue != "": | |
316 cmd += " -q %s" % ( self.job.queue ) | |
317 cmd += " -s %s" % ( newStatus ) | |
318 cmd += " -c %s" % ( self.getConfigFile() ) | |
319 cmd += " -v %i" % ( self._verbose ) | |
320 return "os.system( \"%s\" )\n" % ( cmd ) | |
321 | |
322 | |
323 def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ): | |
324 """ | |
325 Return the launching command as a string. | |
326 Launch the wrapper, retrieve its exit status, update status if error. | |
327 """ | |
328 specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName ) | |
329 cmd = "" | |
330 cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd ) | |
331 cmd += "sys.stdout.flush()\n" | |
332 cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd ) | |
333 cmd += "if exitStatus != 0:\n" | |
334 cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper ) | |
335 cmd += " returned exit status '%i'\" % ( exitStatus )\n" | |
336 cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
337 cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() ) | |
338 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
339 cmd += "\tsys.exit(1)\n" | |
340 return cmd | |
341 | |
342 | |
343 def getCmdToKeepFiles( self, fileName, lFilesToKeep ): | |
344 """ | |
345 Return the commands to keep the output files. | |
346 """ | |
347 cmd = "" | |
348 for f in lFilesToKeep: | |
349 f = f.replace( GENERIC_IN_FILE, fileName ) | |
350 cmd += "if not os.path.exists( \"%s\" ):\n" % ( f ) | |
351 cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f ) | |
352 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
353 cmd += "\tsys.exit(1)\n" | |
354 cmd += "if not os.path.exists( \"%s/%s\" ):\n" \ | |
355 % ( self._currentDir, f ) | |
356 cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f ) | |
357 return cmd | |
358 | |
359 | |
360 def getCmdToRemoveFiles( self, fileName, lFilesToRemove ): | |
361 """ | |
362 Return the commands to remove the temporary files. | |
363 """ | |
364 cmd = "" | |
365 if lFilesToRemove != []: | |
366 for f in lFilesToRemove: | |
367 f = f.replace( GENERIC_IN_FILE, fileName ) | |
368 cmd += "if os.path.exists( \"%s\" ):\n" % ( f ) | |
369 cmd += "\tos.remove( \"%s\" )\n" % ( f ) | |
370 return cmd | |
371 | |
372 | |
373 def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ): | |
374 """ | |
375 Return all the job commands as a string. | |
376 """ | |
377 cmd = "#!/usr/bin/env python\n" | |
378 cmd += "\n" | |
379 cmd += "import os\n" | |
380 cmd += "import sys\n" | |
381 cmd += "import shutil\n" | |
382 cmd += "import time\n" | |
383 cmd += "\n" | |
384 cmd += "print \"system:\", os.uname()\n" | |
385 cmd += "beginTime = time.time()\n" | |
386 cmd += "print 'beginTime=%f' % ( beginTime )\n" | |
387 cmd += "\n" | |
388 cmd += self.getCmdUpdateJobStatusAsString( "running" ) | |
389 cmd += "\n" | |
390 cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() ) | |
391 cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \ | |
392 self.getTemporaryDirectory() ) | |
393 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
394 cmd += "\tsys.exit(1)\n" | |
395 cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
396 cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ? | |
397 cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() ) | |
398 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
399 cmd += "\tsys.exit(1)\n" | |
400 cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() ) | |
401 cmd += "sys.stdout.flush()\n" | |
402 cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
403 cmd += "\n" | |
404 cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") ) | |
405 cmd += "if os.path.exists( newDir ):\n" | |
406 cmd += "\tshutil.rmtree( newDir )\n" | |
407 cmd += "os.mkdir( newDir )\n" | |
408 cmd += "os.chdir( newDir )\n" | |
409 cmd += "\n" | |
410 cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName ) | |
411 cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \ | |
412 ( self._inputDir, fileName, fileName ) | |
413 cmd += "\n" | |
414 | |
415 for pL in self.lPrgLaunchers: | |
416 cmd += self.getCmdToLaunchWrapper( \ | |
417 fileName, \ | |
418 pL.getWrapperCommandLine(), \ | |
419 "%s.py" % ( type(pL).__name__ ) ) | |
420 cmd += "\n" | |
421 cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() ) | |
422 cmd += "\n" | |
423 cmd += self.getCmdToRemoveFiles( fileName, \ | |
424 pL.getListFilesToRemove() ) | |
425 | |
426 cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName ) | |
427 cmd += "\tos.remove( \"%s\" )\n" % ( fileName ) | |
428 cmd += "os.chdir( \"..\" )\n" | |
429 cmd += "shutil.rmtree( newDir )\n" | |
430 cmd += self.getCmdUpdateJobStatusAsString( "finished" ) | |
431 cmd += "\n" | |
432 cmd += "endTime = time.time()\n" | |
433 cmd += "print 'endTime=%f' % ( endTime)\n" | |
434 cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n" | |
435 cmd += "print \"system:\", os.uname()\n" | |
436 cmd += "sys.exit(0)\n" | |
437 return cmd | |
438 | |
439 def getStatsOfExecutionTime( self ): | |
440 """ | |
441 Return a Stat object about the execution time of each job as a | |
442 float expressed in seconds since the epoch, in UTC. | |
443 """ | |
444 stat = Stat() | |
445 pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \ | |
446 self.getAcronym() ) | |
447 lJobFiles = glob.glob( pattern ) | |
448 for f in lJobFiles: | |
449 fH = open( f, "r" ) | |
450 while True: | |
451 line = fH.readline() | |
452 if line == "": | |
453 break | |
454 if "executionTime" in line: | |
455 stat.add( float(line[:-1].split("=")[1] ) ) | |
456 break | |
457 fH.close() | |
458 return stat | |
459 | |
460 | |
461 def formatGroupidAndTime(self): | |
462 return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + "" | |
463 | |
464 def submitJob(self, lInFiles): | |
465 count = 0 | |
466 for inFile in lInFiles: | |
467 count += 1 | |
468 fileName = os.path.basename(inFile) | |
469 if self._verbose > 1: | |
470 print "processing '%s' # %i..." % (fileName, count) | |
471 sys.stdout.flush() | |
472 | |
473 self.initializeJob(fileName, count) | |
474 time.sleep(0.5) | |
475 exitStatus = self.jobdb.submitJob(self.job) | |
476 if exitStatus != 0: | |
477 print "ERROR while submitting job '%i' to the cluster" % (count) | |
478 sys.exit(1) | |
479 | |
480 def checkClusterLauncherAttributes( self ): | |
481 if self.getInputDirectory() == "": | |
482 message = "ERROR: missing input directory" | |
483 raise CheckerException(message) | |
484 if not os.path.exists( self.getInputDirectory() ): | |
485 message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() ) | |
486 raise CheckerException(message) | |
487 if self.getGroupIdentifier() == "": | |
488 message = "ERROR: missing group identifier" | |
489 raise CheckerException(message) | |
490 if self.getAcronym() == "": | |
491 message = "ERROR: missing program acronym" | |
492 raise CheckerException(message) | |
493 if self.getConfigFile() == "": | |
494 message = "ERROR: missing config file to access MySQL" | |
495 raise CheckerException(message) | |
496 if not os.path.exists( self.getConfigFile() ): | |
497 message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() ) | |
498 raise CheckerException(message) | |
499 if self.getTemporaryDirectory() == "": | |
500 self.setTemporaryDirectory(self._currentDir) | |
501 | |
502 def checkGenericAttributes( self ): | |
503 self.checkClusterLauncherAttributes() | |
504 | |
505 def checkProgramLauncherAttributes( self ): | |
506 self.getProgramLauncherInstance().checkSpecificAttributes() | |
507 | |
508 def checkSpecificAttributes( self ): | |
509 self.checkProgramLauncherAttributes() | |
510 | |
511 def start( self ): | |
512 | |
513 if self.lPrgLaunchers == []: | |
514 self.setSingleProgramLauncher() | |
515 for pL in self.lPrgLaunchers: | |
516 if pL.getWrapperCommandLine() == "": | |
517 string = "ERROR: wrapper command is empty !" | |
518 print string | |
519 sys.exit(1) | |
520 if pL.getProgramCommandLine() == "": | |
521 string = "ERROR: program command is empty !" | |
522 print string | |
523 sys.exit(1) | |
524 self.checkProgramAvailability() | |
525 | |
526 try: | |
527 self.checkProgramLauncherAttributes() | |
528 except CheckerException, msg: | |
529 print msg | |
530 print self.getHelpAsString() | |
531 sys.exit(1) | |
532 | |
533 if self.getVerbosityLevel() > 0: | |
534 string = "START %s" % ( type(self).__name__ ) | |
535 print string | |
536 self.job.tablename = self.getJobTableName() | |
537 self.job.groupid = self.getGroupIdentifier() | |
538 tokens = self.getQueueName().replace("'","").split(" ") | |
539 self.job.setQueue( tokens[0] ) | |
540 if len(tokens) > 1: | |
541 lResources = tokens[1:] | |
542 self.job.lResources = lResources | |
543 if self.getVerbosityLevel() > 0: | |
544 print "groupid: %s" % ( self.getGroupIdentifier() ) | |
545 self.jobdb = RepetJob( cfgFileName=self.getConfigFile() ) | |
546 if self.jobdb.hasUnfinishedJob( self.job.tablename, \ | |
547 self.job.groupid ): | |
548 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) | |
549 return | |
550 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) | |
551 sys.stdout.flush() | |
552 | |
553 def end( self ): | |
554 if self.getClean(): | |
555 self.removeAllJobFiles() | |
556 self.removeAllJobStdouts() | |
557 self.removeAllJobStderrs() | |
558 | |
559 if self.getCatOutputFiles(): | |
560 self.catOutputFiles() | |
561 | |
562 self.jobdb.close() | |
563 | |
564 if self.getVerbosityLevel() > 0: | |
565 string = "END %s" % ( type(self).__name__ ) | |
566 print string | |
567 sys.stdout.flush() | |
568 | |
569 def run( self ): | |
570 try: | |
571 self.checkClusterLauncherAttributes() | |
572 except CheckerException, msg: | |
573 print msg | |
574 print self.getHelpAsString() | |
575 sys.exit(1) | |
576 | |
577 self.start() | |
578 | |
579 lInFiles = self.getInputFilesList() | |
580 self._nbJobs = len(lInFiles) | |
581 | |
582 if self._verbose > 0: | |
583 string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime() | |
584 print string; sys.stdout.flush() | |
585 | |
586 self.submitJob(lInFiles) | |
587 | |
588 if self._verbose > 0: | |
589 string = "waiting for jobs... " + self.formatGroupidAndTime() | |
590 print string; sys.stdout.flush() | |
591 | |
592 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) | |
593 | |
594 if self._verbose > 0: | |
595 string = "all jobs completed ! " + self.formatGroupidAndTime() | |
596 print string; sys.stdout.flush() | |
597 statsExecutionTime = self.getStatsOfExecutionTime() | |
598 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() | |
599 print "execution time per job: %s" % statsExecutionTime.string() | |
600 sys.stdout.flush() | |
601 | |
602 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) | |
603 | |
604 self.end() | |
605 | |
606 |