Mercurial > repos > yufei-luo > s_mart
diff commons/pyRepetUnit/components/AbstractClusterLauncher.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/pyRepetUnit/components/AbstractClusterLauncher.py Mon Apr 29 03:20:15 2013 -0400 @@ -0,0 +1,606 @@ +#!/usr/bin/env python + +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +import getopt +import time +import glob +import sys +import os + +from commons.core.checker.CheckerException import CheckerException +from commons.core.sql.RepetJob import RepetJob +from commons.core.sql.Job import Job +from commons.core.stat.Stat import Stat +from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher + +GENERIC_IN_FILE = "zDUMMYz" + + +## Abstract class to launch a program in parallel on a cluster. +# +class AbstractClusterLauncher( object ): #( IClusterLauncher ) + + def __init__( self ): + """ + Constructor. + """ + self._inputDir = "" # path to the directory with input files + self._queueName = "" # name of the queue on the cluster + self._groupId = "" # identifier of the group of jobs (groupid) + self._inputFileSuffix = "fa" # suffix of the input files (default='fa') + self._prgAcronym = "" # acronym of the program to launch + self._configFile = "" # name of the configuration file (connect to MySQL) + self._currentDir = os.getcwd() # path to the current directory + self._tmpDir = "" # path to the temporary directory + self._jobTable = "jobs" # name of the table recording the jobs + self._catOutFiles = False # concatenate output files of all jobs + self._clean = False # clean job file, job stdout, job table... + self._verbose = 1 # verbosity level + self.jobdb = None # RepetJob instance + self.job = Job() # Job instance + + self._nbJobs = 0 + self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:" + self._cmdLineSpecificOptions = "" + + self._exeWrapper = "AbstractProgramLauncher.py" + self._prgLauncher = None + # list of instances derived from AbstractProgramLauncher() + # If several program are launched successively in the same job, + # 'lPrgLaunchers' has to be filled before run(). + self.lPrgLaunchers = [] + + def setProgramLauncherAttributeFromCmdLine(self, o, a=""): + self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a) + + def setClusterLauncherAttributeFromCmdLine(self, o, a=""): + if o == "-h": + print self.getHelpAsString() + sys.exit(0) + elif o == "-i": + self.setInputDirectory(a) + elif o == "-Q": + self.setQueueName(a) + elif o == "-g": + self.setGroupIdentifier(a) + elif o == "-S": + self.setInputFileSuffix(a) + elif o == "-a": + self.setAcronym(a) + elif o == "-C": + self.setConfigFile(a) + elif o == "-d": + self.setTemporaryDirectory(a) + elif o == "-j": + self.setJobTableName(a) + elif o == "-Z": + self.setCatOutputFiles() + elif o == "-c": + self.setClean() + elif o == "-v": + self.setVerbosityLevel(a) + + def setAttributesFromCmdLine(self): + try: + opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions()) + except getopt.GetoptError, err: + print str(err); + print self.getHelpAsString() + sys.exit(1) + for o, a in opts: + self.setClusterLauncherAttributeFromCmdLine(o, a) + self.setProgramLauncherAttributeFromCmdLine(o, a) + + def setAGenericAttributeFromCmdLine( self, o, a="" ): + self.setClusterLauncherAttributeFromCmdLine(o, a) + + def setASpecificAttributeFromCmdLine( self, o, a="" ): + self.setProgramLauncherAttributeFromCmdLine(o, a) + + def setInputDirectory( self, arg ): + self._inputDir = arg + + def setQueueName( self, arg ): + self._queueName = arg + + def setGroupIdentifier( self, arg ): + self._groupId = arg + + def setInputFileSuffix( self, arg ): + self._inputFileSuffix = arg + + def setAcronym( self, arg ): + self._prgAcronym = arg + + def setConfigFile( self, arg ): + if os.path.dirname( arg ) == "": + self._configFile = "%s/%s" % ( os.getcwd(), arg ) + else: + self._configFile = arg + + def setCurrentDirectory( self, arg =os.getcwd()): + self._currentDir = arg + + def setTemporaryDirectory( self, arg ): + self._tmpDir = arg + + def setJobTableName( self, arg ): + self._jobTable = arg + + def setCatOutputFiles( self ): + self._catOutFiles = True + + def setClean( self): + self._clean = True + + def setVerbosityLevel( self, arg ): + self._verbose = int(arg) + + def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ): + self._exeWrapper = arg + + def setSingleProgramLauncher( self ): + """ + Set the wrapper and program command-lines of the program launcher. + Append the program launcher to 'self.lPrgLaunchers'. + """ + self.getProgramLauncherInstance().setWrapperCommandLine() + self.getProgramLauncherInstance().setProgramCommandLine() + self.lPrgLaunchers.append( self.getProgramLauncherInstance() ) + + def getGenericHelpAsString( self ): + string = "" + string += "usage: %s.py [options]" % (type(self).__name__ ) + string += "\ngeneric options:" + string += "\n -h: this help" + string += "\n -i: directory with input files (absolute path)" + string += "\n -Q: name of the queue on the cluster" + string += "\n -g: identifier of the group of jobs (groupid)" + string += "\n -S: suffix of the input files (default='fa')" + string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() ) + string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)" + string += "\n -d: temporary directory (absolute path, default=None)" + string += "\n -j: table recording the jobs (default='jobs')" + string += "\n -c: clean the temporary data" + string += "\n -v: verbosity level (default=0/1/2)" + return string + + def getSpecificHelpAsString( self ): + pass + + def getHelpAsString(self): + return self.getGenericHelpAsString() + self.getSpecificHelpAsString() + + def getInputDirectory( self ): + return self._inputDir + + def getQueueName( self ): + return self._queueName + + def getGroupIdentifier( self ): + return self._groupId + + def getInputFileSuffix( self ): + return self._inputFileSuffix + + def getAcronym( self ): + return self._prgAcronym + + def getConfigFile( self ): + return self._configFile + + def getCurrentDirectory( self ): + return self._currentDir + + def getTemporaryDirectory( self ): + return self._tmpDir + + def getJobTableName( self ): + return self._jobTable + + def getCatOutputFiles( self ): + return self._catOutFiles + + def getClean( self ): + return self._clean + + def getVerbosityLevel( self ): + return self._verbose + + def getWrapperName( self ): + return self.getProgramLauncherInstance().getWrapperName() + + def getProgramName( self ): + return self.getProgramLauncherInstance().getProgramName() + + def getPatternToConcatenate( self ): + return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" ) + + def getProgramLauncherInstance( self ): + if self._prgLauncher == None: + self._prgLauncher = AbstractProgramLauncher() + return self._prgLauncher + + def getInputFilesList(self): + lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix)) + return lInFiles + + def getCmdLineOptions(self): + return "hi:Q:g:S:a:C:d:j:Zcv:" + + + def getProgramCommandLineAsString( self ): + """ + Return the command-line to launch in each job. + Specified in each wrapper. + """ + pass + + + def getListFilesToKeep( self ): + """ + Return the list of files to keep at the end of each job. + Specified in each wrapper. + """ + pass + + + def getListFilesToRemove( self ): + """ + Return the list of files to remove at the end of each job. + Specified in each wrapper. + """ + pass + + + def getJobFileNameAsString( self, count ): + """ + Return the name of the job file as a string. + @param count: job number (e.g. '1') or '*' + @type count: integer or string + """ + jobFile = "ClusterLauncher" + jobFile += "_groupid%s" % ( self.getGroupIdentifier() ) + if count != "*": + jobFile += "_job%i" % ( count ) + jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") ) + else: + jobFile += "_job*" + jobFile += "_time%s-*" % ( time.strftime("%Y-%m") ) + jobFile += ".py" + return jobFile + + + def getCmdUpdateJobStatusAsString( self, newStatus ): + """ + Return the command to update the job status in the table. + """ + prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" + cmd = prg + cmd += " -t %s" % ( self.job.tablename ) + if str(self.job.jobid).isdigit(): + cmd += " -j %s" % ( self.job.jobname ) + else: + cmd += " -j %s" % ( self.job.jobid ) + cmd += " -g %s" % ( self.job.groupid ) + if self.job.queue != "": + cmd += " -q %s" % ( self.job.queue ) + cmd += " -s %s" % ( newStatus ) + cmd += " -c %s" % ( self.getConfigFile() ) + cmd += " -v %i" % ( self._verbose ) + return "os.system( \"%s\" )\n" % ( cmd ) + + + def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ): + """ + Return the launching command as a string. + Launch the wrapper, retrieve its exit status, update status if error. + """ + specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName ) + cmd = "" + cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd ) + cmd += "sys.stdout.flush()\n" + cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd ) + cmd += "if exitStatus != 0:\n" + cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper ) + cmd += " returned exit status '%i'\" % ( exitStatus )\n" + cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) + cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() ) + cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) + cmd += "\tsys.exit(1)\n" + return cmd + + + def getCmdToKeepFiles( self, fileName, lFilesToKeep ): + """ + Return the commands to keep the output files. + """ + cmd = "" + for f in lFilesToKeep: + f = f.replace( GENERIC_IN_FILE, fileName ) + cmd += "if not os.path.exists( \"%s\" ):\n" % ( f ) + cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f ) + cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) + cmd += "\tsys.exit(1)\n" + cmd += "if not os.path.exists( \"%s/%s\" ):\n" \ + % ( self._currentDir, f ) + cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f ) + return cmd + + + def getCmdToRemoveFiles( self, fileName, lFilesToRemove ): + """ + Return the commands to remove the temporary files. + """ + cmd = "" + if lFilesToRemove != []: + for f in lFilesToRemove: + f = f.replace( GENERIC_IN_FILE, fileName ) + cmd += "if os.path.exists( \"%s\" ):\n" % ( f ) + cmd += "\tos.remove( \"%s\" )\n" % ( f ) + return cmd + + + def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ): + """ + Return all the job commands as a string. + """ + cmd = "#!/usr/bin/env python\n" + cmd += "\n" + cmd += "import os\n" + cmd += "import sys\n" + cmd += "import shutil\n" + cmd += "import time\n" + cmd += "\n" + cmd += "print \"system:\", os.uname()\n" + cmd += "beginTime = time.time()\n" + cmd += "print 'beginTime=%f' % ( beginTime )\n" + cmd += "\n" + cmd += self.getCmdUpdateJobStatusAsString( "running" ) + cmd += "\n" + cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() ) + cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \ + self.getTemporaryDirectory() ) + cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) + cmd += "\tsys.exit(1)\n" + cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() ) + cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ? + cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() ) + cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) + cmd += "\tsys.exit(1)\n" + cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() ) + cmd += "sys.stdout.flush()\n" + cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) + cmd += "\n" + cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") ) + cmd += "if os.path.exists( newDir ):\n" + cmd += "\tshutil.rmtree( newDir )\n" + cmd += "os.mkdir( newDir )\n" + cmd += "os.chdir( newDir )\n" + cmd += "\n" + cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName ) + cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \ + ( self._inputDir, fileName, fileName ) + cmd += "\n" + + for pL in self.lPrgLaunchers: + cmd += self.getCmdToLaunchWrapper( \ + fileName, \ + pL.getWrapperCommandLine(), \ + "%s.py" % ( type(pL).__name__ ) ) + cmd += "\n" + cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() ) + cmd += "\n" + cmd += self.getCmdToRemoveFiles( fileName, \ + pL.getListFilesToRemove() ) + + cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName ) + cmd += "\tos.remove( \"%s\" )\n" % ( fileName ) + cmd += "os.chdir( \"..\" )\n" + cmd += "shutil.rmtree( newDir )\n" + cmd += self.getCmdUpdateJobStatusAsString( "finished" ) + cmd += "\n" + cmd += "endTime = time.time()\n" + cmd += "print 'endTime=%f' % ( endTime)\n" + cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n" + cmd += "print \"system:\", os.uname()\n" + cmd += "sys.exit(0)\n" + return cmd + + def getStatsOfExecutionTime( self ): + """ + Return a Stat object about the execution time of each job as a + float expressed in seconds since the epoch, in UTC. + """ + stat = Stat() + pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \ + self.getAcronym() ) + lJobFiles = glob.glob( pattern ) + for f in lJobFiles: + fH = open( f, "r" ) + while True: + line = fH.readline() + if line == "": + break + if "executionTime" in line: + stat.add( float(line[:-1].split("=")[1] ) ) + break + fH.close() + return stat + + + def formatGroupidAndTime(self): + return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + "" + + def submitJob(self, lInFiles): + count = 0 + for inFile in lInFiles: + count += 1 + fileName = os.path.basename(inFile) + if self._verbose > 1: + print "processing '%s' # %i..." % (fileName, count) + sys.stdout.flush() + + self.initializeJob(fileName, count) + time.sleep(0.5) + exitStatus = self.jobdb.submitJob(self.job) + if exitStatus != 0: + print "ERROR while submitting job '%i' to the cluster" % (count) + sys.exit(1) + + def checkClusterLauncherAttributes( self ): + if self.getInputDirectory() == "": + message = "ERROR: missing input directory" + raise CheckerException(message) + if not os.path.exists( self.getInputDirectory() ): + message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() ) + raise CheckerException(message) + if self.getGroupIdentifier() == "": + message = "ERROR: missing group identifier" + raise CheckerException(message) + if self.getAcronym() == "": + message = "ERROR: missing program acronym" + raise CheckerException(message) + if self.getConfigFile() == "": + message = "ERROR: missing config file to access MySQL" + raise CheckerException(message) + if not os.path.exists( self.getConfigFile() ): + message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() ) + raise CheckerException(message) + if self.getTemporaryDirectory() == "": + self.setTemporaryDirectory(self._currentDir) + + def checkGenericAttributes( self ): + self.checkClusterLauncherAttributes() + + def checkProgramLauncherAttributes( self ): + self.getProgramLauncherInstance().checkSpecificAttributes() + + def checkSpecificAttributes( self ): + self.checkProgramLauncherAttributes() + + def start( self ): + + if self.lPrgLaunchers == []: + self.setSingleProgramLauncher() + for pL in self.lPrgLaunchers: + if pL.getWrapperCommandLine() == "": + string = "ERROR: wrapper command is empty !" + print string + sys.exit(1) + if pL.getProgramCommandLine() == "": + string = "ERROR: program command is empty !" + print string + sys.exit(1) + self.checkProgramAvailability() + + try: + self.checkProgramLauncherAttributes() + except CheckerException, msg: + print msg + print self.getHelpAsString() + sys.exit(1) + + if self.getVerbosityLevel() > 0: + string = "START %s" % ( type(self).__name__ ) + print string + self.job.tablename = self.getJobTableName() + self.job.groupid = self.getGroupIdentifier() + tokens = self.getQueueName().replace("'","").split(" ") + self.job.setQueue( tokens[0] ) + if len(tokens) > 1: + lResources = tokens[1:] + self.job.lResources = lResources + if self.getVerbosityLevel() > 0: + print "groupid: %s" % ( self.getGroupIdentifier() ) + self.jobdb = RepetJob( cfgFileName=self.getConfigFile() ) + if self.jobdb.hasUnfinishedJob( self.job.tablename, \ + self.job.groupid ): + self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) + return + self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) + sys.stdout.flush() + + def end( self ): + if self.getClean(): + self.removeAllJobFiles() + self.removeAllJobStdouts() + self.removeAllJobStderrs() + + if self.getCatOutputFiles(): + self.catOutputFiles() + + self.jobdb.close() + + if self.getVerbosityLevel() > 0: + string = "END %s" % ( type(self).__name__ ) + print string + sys.stdout.flush() + + def run( self ): + try: + self.checkClusterLauncherAttributes() + except CheckerException, msg: + print msg + print self.getHelpAsString() + sys.exit(1) + + self.start() + + lInFiles = self.getInputFilesList() + self._nbJobs = len(lInFiles) + + if self._verbose > 0: + string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime() + print string; sys.stdout.flush() + + self.submitJob(lInFiles) + + if self._verbose > 0: + string = "waiting for jobs... " + self.formatGroupidAndTime() + print string; sys.stdout.flush() + + self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) + + if self._verbose > 0: + string = "all jobs completed ! " + self.formatGroupidAndTime() + print string; sys.stdout.flush() + statsExecutionTime = self.getStatsOfExecutionTime() + print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() + print "execution time per job: %s" % statsExecutionTime.string() + sys.stdout.flush() + + self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) + + self.end() + +