comparison commons/pyRepetUnit/components/AbstractClusterLauncher.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 #!/usr/bin/env python
2
3 # Copyright INRA (Institut National de la Recherche Agronomique)
4 # http://www.inra.fr
5 # http://urgi.versailles.inra.fr
6 #
7 # This software is governed by the CeCILL license under French law and
8 # abiding by the rules of distribution of free software. You can use,
9 # modify and/ or redistribute the software under the terms of the CeCILL
10 # license as circulated by CEA, CNRS and INRIA at the following URL
11 # "http://www.cecill.info".
12 #
13 # As a counterpart to the access to the source code and rights to copy,
14 # modify and redistribute granted by the license, users are provided only
15 # with a limited warranty and the software's author, the holder of the
16 # economic rights, and the successive licensors have only limited
17 # liability.
18 #
19 # In this respect, the user's attention is drawn to the risks associated
20 # with loading, using, modifying and/or developing or reproducing the
21 # software by the user in light of its specific status of free software,
22 # that may mean that it is complicated to manipulate, and that also
23 # therefore means that it is reserved for developers and experienced
24 # professionals having in-depth computer knowledge. Users are therefore
25 # encouraged to load and test the software's suitability as regards their
26 # requirements in conditions enabling the security of their systems and/or
27 # data to be ensured and, more generally, to use and operate it in the
28 # same conditions as regards security.
29 #
30 # The fact that you are presently reading this means that you have had
31 # knowledge of the CeCILL license and that you accept its terms.
32
33 import getopt
34 import time
35 import glob
36 import sys
37 import os
38
39 from commons.core.checker.CheckerException import CheckerException
40 from commons.core.sql.RepetJob import RepetJob
41 from commons.core.sql.Job import Job
42 from commons.core.stat.Stat import Stat
43 from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher
44
45 GENERIC_IN_FILE = "zDUMMYz"
46
47
48 ## Abstract class to launch a program in parallel on a cluster.
49 #
50 class AbstractClusterLauncher( object ): #( IClusterLauncher )
51
52 def __init__( self ):
53 """
54 Constructor.
55 """
56 self._inputDir = "" # path to the directory with input files
57 self._queueName = "" # name of the queue on the cluster
58 self._groupId = "" # identifier of the group of jobs (groupid)
59 self._inputFileSuffix = "fa" # suffix of the input files (default='fa')
60 self._prgAcronym = "" # acronym of the program to launch
61 self._configFile = "" # name of the configuration file (connect to MySQL)
62 self._currentDir = os.getcwd() # path to the current directory
63 self._tmpDir = "" # path to the temporary directory
64 self._jobTable = "jobs" # name of the table recording the jobs
65 self._catOutFiles = False # concatenate output files of all jobs
66 self._clean = False # clean job file, job stdout, job table...
67 self._verbose = 1 # verbosity level
68 self.jobdb = None # RepetJob instance
69 self.job = Job() # Job instance
70
71 self._nbJobs = 0
72 self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:"
73 self._cmdLineSpecificOptions = ""
74
75 self._exeWrapper = "AbstractProgramLauncher.py"
76 self._prgLauncher = None
77 # list of instances derived from AbstractProgramLauncher()
78 # If several program are launched successively in the same job,
79 # 'lPrgLaunchers' has to be filled before run().
80 self.lPrgLaunchers = []
81
82 def setProgramLauncherAttributeFromCmdLine(self, o, a=""):
83 self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a)
84
85 def setClusterLauncherAttributeFromCmdLine(self, o, a=""):
86 if o == "-h":
87 print self.getHelpAsString()
88 sys.exit(0)
89 elif o == "-i":
90 self.setInputDirectory(a)
91 elif o == "-Q":
92 self.setQueueName(a)
93 elif o == "-g":
94 self.setGroupIdentifier(a)
95 elif o == "-S":
96 self.setInputFileSuffix(a)
97 elif o == "-a":
98 self.setAcronym(a)
99 elif o == "-C":
100 self.setConfigFile(a)
101 elif o == "-d":
102 self.setTemporaryDirectory(a)
103 elif o == "-j":
104 self.setJobTableName(a)
105 elif o == "-Z":
106 self.setCatOutputFiles()
107 elif o == "-c":
108 self.setClean()
109 elif o == "-v":
110 self.setVerbosityLevel(a)
111
112 def setAttributesFromCmdLine(self):
113 try:
114 opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions())
115 except getopt.GetoptError, err:
116 print str(err);
117 print self.getHelpAsString()
118 sys.exit(1)
119 for o, a in opts:
120 self.setClusterLauncherAttributeFromCmdLine(o, a)
121 self.setProgramLauncherAttributeFromCmdLine(o, a)
122
123 def setAGenericAttributeFromCmdLine( self, o, a="" ):
124 self.setClusterLauncherAttributeFromCmdLine(o, a)
125
126 def setASpecificAttributeFromCmdLine( self, o, a="" ):
127 self.setProgramLauncherAttributeFromCmdLine(o, a)
128
129 def setInputDirectory( self, arg ):
130 self._inputDir = arg
131
132 def setQueueName( self, arg ):
133 self._queueName = arg
134
135 def setGroupIdentifier( self, arg ):
136 self._groupId = arg
137
138 def setInputFileSuffix( self, arg ):
139 self._inputFileSuffix = arg
140
141 def setAcronym( self, arg ):
142 self._prgAcronym = arg
143
144 def setConfigFile( self, arg ):
145 if os.path.dirname( arg ) == "":
146 self._configFile = "%s/%s" % ( os.getcwd(), arg )
147 else:
148 self._configFile = arg
149
150 def setCurrentDirectory( self, arg =os.getcwd()):
151 self._currentDir = arg
152
153 def setTemporaryDirectory( self, arg ):
154 self._tmpDir = arg
155
156 def setJobTableName( self, arg ):
157 self._jobTable = arg
158
159 def setCatOutputFiles( self ):
160 self._catOutFiles = True
161
162 def setClean( self):
163 self._clean = True
164
165 def setVerbosityLevel( self, arg ):
166 self._verbose = int(arg)
167
168 def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ):
169 self._exeWrapper = arg
170
171 def setSingleProgramLauncher( self ):
172 """
173 Set the wrapper and program command-lines of the program launcher.
174 Append the program launcher to 'self.lPrgLaunchers'.
175 """
176 self.getProgramLauncherInstance().setWrapperCommandLine()
177 self.getProgramLauncherInstance().setProgramCommandLine()
178 self.lPrgLaunchers.append( self.getProgramLauncherInstance() )
179
180 def getGenericHelpAsString( self ):
181 string = ""
182 string += "usage: %s.py [options]" % (type(self).__name__ )
183 string += "\ngeneric options:"
184 string += "\n -h: this help"
185 string += "\n -i: directory with input files (absolute path)"
186 string += "\n -Q: name of the queue on the cluster"
187 string += "\n -g: identifier of the group of jobs (groupid)"
188 string += "\n -S: suffix of the input files (default='fa')"
189 string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() )
190 string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)"
191 string += "\n -d: temporary directory (absolute path, default=None)"
192 string += "\n -j: table recording the jobs (default='jobs')"
193 string += "\n -c: clean the temporary data"
194 string += "\n -v: verbosity level (default=0/1/2)"
195 return string
196
197 def getSpecificHelpAsString( self ):
198 pass
199
200 def getHelpAsString(self):
201 return self.getGenericHelpAsString() + self.getSpecificHelpAsString()
202
203 def getInputDirectory( self ):
204 return self._inputDir
205
206 def getQueueName( self ):
207 return self._queueName
208
209 def getGroupIdentifier( self ):
210 return self._groupId
211
212 def getInputFileSuffix( self ):
213 return self._inputFileSuffix
214
215 def getAcronym( self ):
216 return self._prgAcronym
217
218 def getConfigFile( self ):
219 return self._configFile
220
221 def getCurrentDirectory( self ):
222 return self._currentDir
223
224 def getTemporaryDirectory( self ):
225 return self._tmpDir
226
227 def getJobTableName( self ):
228 return self._jobTable
229
230 def getCatOutputFiles( self ):
231 return self._catOutFiles
232
233 def getClean( self ):
234 return self._clean
235
236 def getVerbosityLevel( self ):
237 return self._verbose
238
239 def getWrapperName( self ):
240 return self.getProgramLauncherInstance().getWrapperName()
241
242 def getProgramName( self ):
243 return self.getProgramLauncherInstance().getProgramName()
244
245 def getPatternToConcatenate( self ):
246 return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" )
247
248 def getProgramLauncherInstance( self ):
249 if self._prgLauncher == None:
250 self._prgLauncher = AbstractProgramLauncher()
251 return self._prgLauncher
252
253 def getInputFilesList(self):
254 lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix))
255 return lInFiles
256
257 def getCmdLineOptions(self):
258 return "hi:Q:g:S:a:C:d:j:Zcv:"
259
260
261 def getProgramCommandLineAsString( self ):
262 """
263 Return the command-line to launch in each job.
264 Specified in each wrapper.
265 """
266 pass
267
268
269 def getListFilesToKeep( self ):
270 """
271 Return the list of files to keep at the end of each job.
272 Specified in each wrapper.
273 """
274 pass
275
276
277 def getListFilesToRemove( self ):
278 """
279 Return the list of files to remove at the end of each job.
280 Specified in each wrapper.
281 """
282 pass
283
284
285 def getJobFileNameAsString( self, count ):
286 """
287 Return the name of the job file as a string.
288 @param count: job number (e.g. '1') or '*'
289 @type count: integer or string
290 """
291 jobFile = "ClusterLauncher"
292 jobFile += "_groupid%s" % ( self.getGroupIdentifier() )
293 if count != "*":
294 jobFile += "_job%i" % ( count )
295 jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") )
296 else:
297 jobFile += "_job*"
298 jobFile += "_time%s-*" % ( time.strftime("%Y-%m") )
299 jobFile += ".py"
300 return jobFile
301
302
303 def getCmdUpdateJobStatusAsString( self, newStatus ):
304 """
305 Return the command to update the job status in the table.
306 """
307 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py"
308 cmd = prg
309 cmd += " -t %s" % ( self.job.tablename )
310 if str(self.job.jobid).isdigit():
311 cmd += " -j %s" % ( self.job.jobname )
312 else:
313 cmd += " -j %s" % ( self.job.jobid )
314 cmd += " -g %s" % ( self.job.groupid )
315 if self.job.queue != "":
316 cmd += " -q %s" % ( self.job.queue )
317 cmd += " -s %s" % ( newStatus )
318 cmd += " -c %s" % ( self.getConfigFile() )
319 cmd += " -v %i" % ( self._verbose )
320 return "os.system( \"%s\" )\n" % ( cmd )
321
322
323 def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ):
324 """
325 Return the launching command as a string.
326 Launch the wrapper, retrieve its exit status, update status if error.
327 """
328 specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName )
329 cmd = ""
330 cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd )
331 cmd += "sys.stdout.flush()\n"
332 cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd )
333 cmd += "if exitStatus != 0:\n"
334 cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper )
335 cmd += " returned exit status '%i'\" % ( exitStatus )\n"
336 cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() )
337 cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() )
338 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
339 cmd += "\tsys.exit(1)\n"
340 return cmd
341
342
343 def getCmdToKeepFiles( self, fileName, lFilesToKeep ):
344 """
345 Return the commands to keep the output files.
346 """
347 cmd = ""
348 for f in lFilesToKeep:
349 f = f.replace( GENERIC_IN_FILE, fileName )
350 cmd += "if not os.path.exists( \"%s\" ):\n" % ( f )
351 cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f )
352 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
353 cmd += "\tsys.exit(1)\n"
354 cmd += "if not os.path.exists( \"%s/%s\" ):\n" \
355 % ( self._currentDir, f )
356 cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f )
357 return cmd
358
359
360 def getCmdToRemoveFiles( self, fileName, lFilesToRemove ):
361 """
362 Return the commands to remove the temporary files.
363 """
364 cmd = ""
365 if lFilesToRemove != []:
366 for f in lFilesToRemove:
367 f = f.replace( GENERIC_IN_FILE, fileName )
368 cmd += "if os.path.exists( \"%s\" ):\n" % ( f )
369 cmd += "\tos.remove( \"%s\" )\n" % ( f )
370 return cmd
371
372
373 def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ):
374 """
375 Return all the job commands as a string.
376 """
377 cmd = "#!/usr/bin/env python\n"
378 cmd += "\n"
379 cmd += "import os\n"
380 cmd += "import sys\n"
381 cmd += "import shutil\n"
382 cmd += "import time\n"
383 cmd += "\n"
384 cmd += "print \"system:\", os.uname()\n"
385 cmd += "beginTime = time.time()\n"
386 cmd += "print 'beginTime=%f' % ( beginTime )\n"
387 cmd += "\n"
388 cmd += self.getCmdUpdateJobStatusAsString( "running" )
389 cmd += "\n"
390 cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() )
391 cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \
392 self.getTemporaryDirectory() )
393 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
394 cmd += "\tsys.exit(1)\n"
395 cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() )
396 cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ?
397 cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() )
398 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) )
399 cmd += "\tsys.exit(1)\n"
400 cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() )
401 cmd += "sys.stdout.flush()\n"
402 cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() )
403 cmd += "\n"
404 cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") )
405 cmd += "if os.path.exists( newDir ):\n"
406 cmd += "\tshutil.rmtree( newDir )\n"
407 cmd += "os.mkdir( newDir )\n"
408 cmd += "os.chdir( newDir )\n"
409 cmd += "\n"
410 cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName )
411 cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \
412 ( self._inputDir, fileName, fileName )
413 cmd += "\n"
414
415 for pL in self.lPrgLaunchers:
416 cmd += self.getCmdToLaunchWrapper( \
417 fileName, \
418 pL.getWrapperCommandLine(), \
419 "%s.py" % ( type(pL).__name__ ) )
420 cmd += "\n"
421 cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() )
422 cmd += "\n"
423 cmd += self.getCmdToRemoveFiles( fileName, \
424 pL.getListFilesToRemove() )
425
426 cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName )
427 cmd += "\tos.remove( \"%s\" )\n" % ( fileName )
428 cmd += "os.chdir( \"..\" )\n"
429 cmd += "shutil.rmtree( newDir )\n"
430 cmd += self.getCmdUpdateJobStatusAsString( "finished" )
431 cmd += "\n"
432 cmd += "endTime = time.time()\n"
433 cmd += "print 'endTime=%f' % ( endTime)\n"
434 cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n"
435 cmd += "print \"system:\", os.uname()\n"
436 cmd += "sys.exit(0)\n"
437 return cmd
438
439 def getStatsOfExecutionTime( self ):
440 """
441 Return a Stat object about the execution time of each job as a
442 float expressed in seconds since the epoch, in UTC.
443 """
444 stat = Stat()
445 pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \
446 self.getAcronym() )
447 lJobFiles = glob.glob( pattern )
448 for f in lJobFiles:
449 fH = open( f, "r" )
450 while True:
451 line = fH.readline()
452 if line == "":
453 break
454 if "executionTime" in line:
455 stat.add( float(line[:-1].split("=")[1] ) )
456 break
457 fH.close()
458 return stat
459
460
461 def formatGroupidAndTime(self):
462 return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + ""
463
464 def submitJob(self, lInFiles):
465 count = 0
466 for inFile in lInFiles:
467 count += 1
468 fileName = os.path.basename(inFile)
469 if self._verbose > 1:
470 print "processing '%s' # %i..." % (fileName, count)
471 sys.stdout.flush()
472
473 self.initializeJob(fileName, count)
474 time.sleep(0.5)
475 exitStatus = self.jobdb.submitJob(self.job)
476 if exitStatus != 0:
477 print "ERROR while submitting job '%i' to the cluster" % (count)
478 sys.exit(1)
479
480 def checkClusterLauncherAttributes( self ):
481 if self.getInputDirectory() == "":
482 message = "ERROR: missing input directory"
483 raise CheckerException(message)
484 if not os.path.exists( self.getInputDirectory() ):
485 message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() )
486 raise CheckerException(message)
487 if self.getGroupIdentifier() == "":
488 message = "ERROR: missing group identifier"
489 raise CheckerException(message)
490 if self.getAcronym() == "":
491 message = "ERROR: missing program acronym"
492 raise CheckerException(message)
493 if self.getConfigFile() == "":
494 message = "ERROR: missing config file to access MySQL"
495 raise CheckerException(message)
496 if not os.path.exists( self.getConfigFile() ):
497 message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() )
498 raise CheckerException(message)
499 if self.getTemporaryDirectory() == "":
500 self.setTemporaryDirectory(self._currentDir)
501
502 def checkGenericAttributes( self ):
503 self.checkClusterLauncherAttributes()
504
505 def checkProgramLauncherAttributes( self ):
506 self.getProgramLauncherInstance().checkSpecificAttributes()
507
508 def checkSpecificAttributes( self ):
509 self.checkProgramLauncherAttributes()
510
511 def start( self ):
512
513 if self.lPrgLaunchers == []:
514 self.setSingleProgramLauncher()
515 for pL in self.lPrgLaunchers:
516 if pL.getWrapperCommandLine() == "":
517 string = "ERROR: wrapper command is empty !"
518 print string
519 sys.exit(1)
520 if pL.getProgramCommandLine() == "":
521 string = "ERROR: program command is empty !"
522 print string
523 sys.exit(1)
524 self.checkProgramAvailability()
525
526 try:
527 self.checkProgramLauncherAttributes()
528 except CheckerException, msg:
529 print msg
530 print self.getHelpAsString()
531 sys.exit(1)
532
533 if self.getVerbosityLevel() > 0:
534 string = "START %s" % ( type(self).__name__ )
535 print string
536 self.job.tablename = self.getJobTableName()
537 self.job.groupid = self.getGroupIdentifier()
538 tokens = self.getQueueName().replace("'","").split(" ")
539 self.job.setQueue( tokens[0] )
540 if len(tokens) > 1:
541 lResources = tokens[1:]
542 self.job.lResources = lResources
543 if self.getVerbosityLevel() > 0:
544 print "groupid: %s" % ( self.getGroupIdentifier() )
545 self.jobdb = RepetJob( cfgFileName=self.getConfigFile() )
546 if self.jobdb.hasUnfinishedJob( self.job.tablename, \
547 self.job.groupid ):
548 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid )
549 return
550 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid )
551 sys.stdout.flush()
552
553 def end( self ):
554 if self.getClean():
555 self.removeAllJobFiles()
556 self.removeAllJobStdouts()
557 self.removeAllJobStderrs()
558
559 if self.getCatOutputFiles():
560 self.catOutputFiles()
561
562 self.jobdb.close()
563
564 if self.getVerbosityLevel() > 0:
565 string = "END %s" % ( type(self).__name__ )
566 print string
567 sys.stdout.flush()
568
569 def run( self ):
570 try:
571 self.checkClusterLauncherAttributes()
572 except CheckerException, msg:
573 print msg
574 print self.getHelpAsString()
575 sys.exit(1)
576
577 self.start()
578
579 lInFiles = self.getInputFilesList()
580 self._nbJobs = len(lInFiles)
581
582 if self._verbose > 0:
583 string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime()
584 print string; sys.stdout.flush()
585
586 self.submitJob(lInFiles)
587
588 if self._verbose > 0:
589 string = "waiting for jobs... " + self.formatGroupidAndTime()
590 print string; sys.stdout.flush()
591
592 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid )
593
594 if self._verbose > 0:
595 string = "all jobs completed ! " + self.formatGroupidAndTime()
596 print string; sys.stdout.flush()
597 statsExecutionTime = self.getStatsOfExecutionTime()
598 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum()
599 print "execution time per job: %s" % statsExecutionTime.string()
600 sys.stdout.flush()
601
602 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid )
603
604 self.end()
605
606