Mercurial > repos > yufei-luo > s_mart
view commons/pyRepetUnit/components/AbstractClusterLauncher.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import getopt import time import glob import sys import os from commons.core.checker.CheckerException import CheckerException from commons.core.sql.RepetJob import RepetJob from commons.core.sql.Job import Job from commons.core.stat.Stat import Stat from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher GENERIC_IN_FILE = "zDUMMYz" ## Abstract class to launch a program in parallel on a cluster. # class AbstractClusterLauncher( object ): #( IClusterLauncher ) def __init__( self ): """ Constructor. """ self._inputDir = "" # path to the directory with input files self._queueName = "" # name of the queue on the cluster self._groupId = "" # identifier of the group of jobs (groupid) self._inputFileSuffix = "fa" # suffix of the input files (default='fa') self._prgAcronym = "" # acronym of the program to launch self._configFile = "" # name of the configuration file (connect to MySQL) self._currentDir = os.getcwd() # path to the current directory self._tmpDir = "" # path to the temporary directory self._jobTable = "jobs" # name of the table recording the jobs self._catOutFiles = False # concatenate output files of all jobs self._clean = False # clean job file, job stdout, job table... self._verbose = 1 # verbosity level self.jobdb = None # RepetJob instance self.job = Job() # Job instance self._nbJobs = 0 self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:" self._cmdLineSpecificOptions = "" self._exeWrapper = "AbstractProgramLauncher.py" self._prgLauncher = None # list of instances derived from AbstractProgramLauncher() # If several program are launched successively in the same job, # 'lPrgLaunchers' has to be filled before run(). self.lPrgLaunchers = [] def setProgramLauncherAttributeFromCmdLine(self, o, a=""): self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a) def setClusterLauncherAttributeFromCmdLine(self, o, a=""): if o == "-h": print self.getHelpAsString() sys.exit(0) elif o == "-i": self.setInputDirectory(a) elif o == "-Q": self.setQueueName(a) elif o == "-g": self.setGroupIdentifier(a) elif o == "-S": self.setInputFileSuffix(a) elif o == "-a": self.setAcronym(a) elif o == "-C": self.setConfigFile(a) elif o == "-d": self.setTemporaryDirectory(a) elif o == "-j": self.setJobTableName(a) elif o == "-Z": self.setCatOutputFiles() elif o == "-c": self.setClean() elif o == "-v": self.setVerbosityLevel(a) def setAttributesFromCmdLine(self): try: opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions()) except getopt.GetoptError, err: print str(err); print self.getHelpAsString() sys.exit(1) for o, a in opts: self.setClusterLauncherAttributeFromCmdLine(o, a) self.setProgramLauncherAttributeFromCmdLine(o, a) def setAGenericAttributeFromCmdLine( self, o, a="" ): self.setClusterLauncherAttributeFromCmdLine(o, a) def setASpecificAttributeFromCmdLine( self, o, a="" ): self.setProgramLauncherAttributeFromCmdLine(o, a) def setInputDirectory( self, arg ): self._inputDir = arg def setQueueName( self, arg ): self._queueName = arg def setGroupIdentifier( self, arg ): self._groupId = arg def setInputFileSuffix( self, arg ): self._inputFileSuffix = arg def setAcronym( self, arg ): self._prgAcronym = arg def setConfigFile( self, arg ): if os.path.dirname( arg ) == "": self._configFile = "%s/%s" % ( os.getcwd(), arg ) else: self._configFile = arg def setCurrentDirectory( self, arg =os.getcwd()): self._currentDir = arg def setTemporaryDirectory( self, arg ): self._tmpDir = arg def setJobTableName( self, arg ): self._jobTable = arg def setCatOutputFiles( self ): self._catOutFiles = True def setClean( self): self._clean = True def setVerbosityLevel( self, arg ): self._verbose = int(arg) def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ): self._exeWrapper = arg def setSingleProgramLauncher( self ): """ Set the wrapper and program command-lines of the program launcher. Append the program launcher to 'self.lPrgLaunchers'. """ self.getProgramLauncherInstance().setWrapperCommandLine() self.getProgramLauncherInstance().setProgramCommandLine() self.lPrgLaunchers.append( self.getProgramLauncherInstance() ) def getGenericHelpAsString( self ): string = "" string += "usage: %s.py [options]" % (type(self).__name__ ) string += "\ngeneric options:" string += "\n -h: this help" string += "\n -i: directory with input files (absolute path)" string += "\n -Q: name of the queue on the cluster" string += "\n -g: identifier of the group of jobs (groupid)" string += "\n -S: suffix of the input files (default='fa')" string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() ) string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)" string += "\n -d: temporary directory (absolute path, default=None)" string += "\n -j: table recording the jobs (default='jobs')" string += "\n -c: clean the temporary data" string += "\n -v: verbosity level (default=0/1/2)" return string def getSpecificHelpAsString( self ): pass def getHelpAsString(self): return self.getGenericHelpAsString() + self.getSpecificHelpAsString() def getInputDirectory( self ): return self._inputDir def getQueueName( self ): return self._queueName def getGroupIdentifier( self ): return self._groupId def getInputFileSuffix( self ): return self._inputFileSuffix def getAcronym( self ): return self._prgAcronym def getConfigFile( self ): return self._configFile def getCurrentDirectory( self ): return self._currentDir def getTemporaryDirectory( self ): return self._tmpDir def getJobTableName( self ): return self._jobTable def getCatOutputFiles( self ): return self._catOutFiles def getClean( self ): return self._clean def getVerbosityLevel( self ): return self._verbose def getWrapperName( self ): return self.getProgramLauncherInstance().getWrapperName() def getProgramName( self ): return self.getProgramLauncherInstance().getProgramName() def getPatternToConcatenate( self ): return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" ) def getProgramLauncherInstance( self ): if self._prgLauncher == None: self._prgLauncher = AbstractProgramLauncher() return self._prgLauncher def getInputFilesList(self): lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix)) return lInFiles def getCmdLineOptions(self): return "hi:Q:g:S:a:C:d:j:Zcv:" def getProgramCommandLineAsString( self ): """ Return the command-line to launch in each job. Specified in each wrapper. """ pass def getListFilesToKeep( self ): """ Return the list of files to keep at the end of each job. Specified in each wrapper. """ pass def getListFilesToRemove( self ): """ Return the list of files to remove at the end of each job. Specified in each wrapper. """ pass def getJobFileNameAsString( self, count ): """ Return the name of the job file as a string. @param count: job number (e.g. '1') or '*' @type count: integer or string """ jobFile = "ClusterLauncher" jobFile += "_groupid%s" % ( self.getGroupIdentifier() ) if count != "*": jobFile += "_job%i" % ( count ) jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") ) else: jobFile += "_job*" jobFile += "_time%s-*" % ( time.strftime("%Y-%m") ) jobFile += ".py" return jobFile def getCmdUpdateJobStatusAsString( self, newStatus ): """ Return the command to update the job status in the table. """ prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" cmd = prg cmd += " -t %s" % ( self.job.tablename ) if str(self.job.jobid).isdigit(): cmd += " -j %s" % ( self.job.jobname ) else: cmd += " -j %s" % ( self.job.jobid ) cmd += " -g %s" % ( self.job.groupid ) if self.job.queue != "": cmd += " -q %s" % ( self.job.queue ) cmd += " -s %s" % ( newStatus ) cmd += " -c %s" % ( self.getConfigFile() ) cmd += " -v %i" % ( self._verbose ) return "os.system( \"%s\" )\n" % ( cmd ) def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ): """ Return the launching command as a string. Launch the wrapper, retrieve its exit status, update status if error. """ specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName ) cmd = "" cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd ) cmd += "sys.stdout.flush()\n" cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd ) cmd += "if exitStatus != 0:\n" cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper ) cmd += " returned exit status '%i'\" % ( exitStatus )\n" cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() ) cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) cmd += "\tsys.exit(1)\n" return cmd def getCmdToKeepFiles( self, fileName, lFilesToKeep ): """ Return the commands to keep the output files. """ cmd = "" for f in lFilesToKeep: f = f.replace( GENERIC_IN_FILE, fileName ) cmd += "if not os.path.exists( \"%s\" ):\n" % ( f ) cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f ) cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) cmd += "\tsys.exit(1)\n" cmd += "if not os.path.exists( \"%s/%s\" ):\n" \ % ( self._currentDir, f ) cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f ) return cmd def getCmdToRemoveFiles( self, fileName, lFilesToRemove ): """ Return the commands to remove the temporary files. """ cmd = "" if lFilesToRemove != []: for f in lFilesToRemove: f = f.replace( GENERIC_IN_FILE, fileName ) cmd += "if os.path.exists( \"%s\" ):\n" % ( f ) cmd += "\tos.remove( \"%s\" )\n" % ( f ) return cmd def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ): """ Return all the job commands as a string. """ cmd = "#!/usr/bin/env python\n" cmd += "\n" cmd += "import os\n" cmd += "import sys\n" cmd += "import shutil\n" cmd += "import time\n" cmd += "\n" cmd += "print \"system:\", os.uname()\n" cmd += "beginTime = time.time()\n" cmd += "print 'beginTime=%f' % ( beginTime )\n" cmd += "\n" cmd += self.getCmdUpdateJobStatusAsString( "running" ) cmd += "\n" cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() ) cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \ self.getTemporaryDirectory() ) cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) cmd += "\tsys.exit(1)\n" cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() ) cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ? cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() ) cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) cmd += "\tsys.exit(1)\n" cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() ) cmd += "sys.stdout.flush()\n" cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) cmd += "\n" cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") ) cmd += "if os.path.exists( newDir ):\n" cmd += "\tshutil.rmtree( newDir )\n" cmd += "os.mkdir( newDir )\n" cmd += "os.chdir( newDir )\n" cmd += "\n" cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName ) cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \ ( self._inputDir, fileName, fileName ) cmd += "\n" for pL in self.lPrgLaunchers: cmd += self.getCmdToLaunchWrapper( \ fileName, \ pL.getWrapperCommandLine(), \ "%s.py" % ( type(pL).__name__ ) ) cmd += "\n" cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() ) cmd += "\n" cmd += self.getCmdToRemoveFiles( fileName, \ pL.getListFilesToRemove() ) cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName ) cmd += "\tos.remove( \"%s\" )\n" % ( fileName ) cmd += "os.chdir( \"..\" )\n" cmd += "shutil.rmtree( newDir )\n" cmd += self.getCmdUpdateJobStatusAsString( "finished" ) cmd += "\n" cmd += "endTime = time.time()\n" cmd += "print 'endTime=%f' % ( endTime)\n" cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n" cmd += "print \"system:\", os.uname()\n" cmd += "sys.exit(0)\n" return cmd def getStatsOfExecutionTime( self ): """ Return a Stat object about the execution time of each job as a float expressed in seconds since the epoch, in UTC. """ stat = Stat() pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \ self.getAcronym() ) lJobFiles = glob.glob( pattern ) for f in lJobFiles: fH = open( f, "r" ) while True: line = fH.readline() if line == "": break if "executionTime" in line: stat.add( float(line[:-1].split("=")[1] ) ) break fH.close() return stat def formatGroupidAndTime(self): return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + "" def submitJob(self, lInFiles): count = 0 for inFile in lInFiles: count += 1 fileName = os.path.basename(inFile) if self._verbose > 1: print "processing '%s' # %i..." % (fileName, count) sys.stdout.flush() self.initializeJob(fileName, count) time.sleep(0.5) exitStatus = self.jobdb.submitJob(self.job) if exitStatus != 0: print "ERROR while submitting job '%i' to the cluster" % (count) sys.exit(1) def checkClusterLauncherAttributes( self ): if self.getInputDirectory() == "": message = "ERROR: missing input directory" raise CheckerException(message) if not os.path.exists( self.getInputDirectory() ): message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() ) raise CheckerException(message) if self.getGroupIdentifier() == "": message = "ERROR: missing group identifier" raise CheckerException(message) if self.getAcronym() == "": message = "ERROR: missing program acronym" raise CheckerException(message) if self.getConfigFile() == "": message = "ERROR: missing config file to access MySQL" raise CheckerException(message) if not os.path.exists( self.getConfigFile() ): message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() ) raise CheckerException(message) if self.getTemporaryDirectory() == "": self.setTemporaryDirectory(self._currentDir) def checkGenericAttributes( self ): self.checkClusterLauncherAttributes() def checkProgramLauncherAttributes( self ): self.getProgramLauncherInstance().checkSpecificAttributes() def checkSpecificAttributes( self ): self.checkProgramLauncherAttributes() def start( self ): if self.lPrgLaunchers == []: self.setSingleProgramLauncher() for pL in self.lPrgLaunchers: if pL.getWrapperCommandLine() == "": string = "ERROR: wrapper command is empty !" print string sys.exit(1) if pL.getProgramCommandLine() == "": string = "ERROR: program command is empty !" print string sys.exit(1) self.checkProgramAvailability() try: self.checkProgramLauncherAttributes() except CheckerException, msg: print msg print self.getHelpAsString() sys.exit(1) if self.getVerbosityLevel() > 0: string = "START %s" % ( type(self).__name__ ) print string self.job.tablename = self.getJobTableName() self.job.groupid = self.getGroupIdentifier() tokens = self.getQueueName().replace("'","").split(" ") self.job.setQueue( tokens[0] ) if len(tokens) > 1: lResources = tokens[1:] self.job.lResources = lResources if self.getVerbosityLevel() > 0: print "groupid: %s" % ( self.getGroupIdentifier() ) self.jobdb = RepetJob( cfgFileName=self.getConfigFile() ) if self.jobdb.hasUnfinishedJob( self.job.tablename, \ self.job.groupid ): self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) return self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) sys.stdout.flush() def end( self ): if self.getClean(): self.removeAllJobFiles() self.removeAllJobStdouts() self.removeAllJobStderrs() if self.getCatOutputFiles(): self.catOutputFiles() self.jobdb.close() if self.getVerbosityLevel() > 0: string = "END %s" % ( type(self).__name__ ) print string sys.stdout.flush() def run( self ): try: self.checkClusterLauncherAttributes() except CheckerException, msg: print msg print self.getHelpAsString() sys.exit(1) self.start() lInFiles = self.getInputFilesList() self._nbJobs = len(lInFiles) if self._verbose > 0: string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime() print string; sys.stdout.flush() self.submitJob(lInFiles) if self._verbose > 0: string = "waiting for jobs... " + self.formatGroupidAndTime() print string; sys.stdout.flush() self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) if self._verbose > 0: string = "all jobs completed ! " + self.formatGroupidAndTime() print string; sys.stdout.flush() statsExecutionTime = self.getStatsOfExecutionTime() print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() print "execution time per job: %s" % statsExecutionTime.string() sys.stdout.flush() self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) self.end()