Mercurial > repos > yufei-luo > s_mart
comparison commons/pyRepetUnit/components/AbstractClusterLauncher.py @ 18:94ab73e8a190
Uploaded
| author | m-zytnicki |
|---|---|
| date | Mon, 29 Apr 2013 03:20:15 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 17:b0e8584489e6 | 18:94ab73e8a190 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 # Copyright INRA (Institut National de la Recherche Agronomique) | |
| 4 # http://www.inra.fr | |
| 5 # http://urgi.versailles.inra.fr | |
| 6 # | |
| 7 # This software is governed by the CeCILL license under French law and | |
| 8 # abiding by the rules of distribution of free software. You can use, | |
| 9 # modify and/ or redistribute the software under the terms of the CeCILL | |
| 10 # license as circulated by CEA, CNRS and INRIA at the following URL | |
| 11 # "http://www.cecill.info". | |
| 12 # | |
| 13 # As a counterpart to the access to the source code and rights to copy, | |
| 14 # modify and redistribute granted by the license, users are provided only | |
| 15 # with a limited warranty and the software's author, the holder of the | |
| 16 # economic rights, and the successive licensors have only limited | |
| 17 # liability. | |
| 18 # | |
| 19 # In this respect, the user's attention is drawn to the risks associated | |
| 20 # with loading, using, modifying and/or developing or reproducing the | |
| 21 # software by the user in light of its specific status of free software, | |
| 22 # that may mean that it is complicated to manipulate, and that also | |
| 23 # therefore means that it is reserved for developers and experienced | |
| 24 # professionals having in-depth computer knowledge. Users are therefore | |
| 25 # encouraged to load and test the software's suitability as regards their | |
| 26 # requirements in conditions enabling the security of their systems and/or | |
| 27 # data to be ensured and, more generally, to use and operate it in the | |
| 28 # same conditions as regards security. | |
| 29 # | |
| 30 # The fact that you are presently reading this means that you have had | |
| 31 # knowledge of the CeCILL license and that you accept its terms. | |
| 32 | |
| 33 import getopt | |
| 34 import time | |
| 35 import glob | |
| 36 import sys | |
| 37 import os | |
| 38 | |
| 39 from commons.core.checker.CheckerException import CheckerException | |
| 40 from commons.core.sql.RepetJob import RepetJob | |
| 41 from commons.core.sql.Job import Job | |
| 42 from commons.core.stat.Stat import Stat | |
| 43 from pyRepet.launcher.AbstractProgramLauncher import AbstractProgramLauncher | |
| 44 | |
| 45 GENERIC_IN_FILE = "zDUMMYz" | |
| 46 | |
| 47 | |
| 48 ## Abstract class to launch a program in parallel on a cluster. | |
| 49 # | |
| 50 class AbstractClusterLauncher( object ): #( IClusterLauncher ) | |
| 51 | |
| 52 def __init__( self ): | |
| 53 """ | |
| 54 Constructor. | |
| 55 """ | |
| 56 self._inputDir = "" # path to the directory with input files | |
| 57 self._queueName = "" # name of the queue on the cluster | |
| 58 self._groupId = "" # identifier of the group of jobs (groupid) | |
| 59 self._inputFileSuffix = "fa" # suffix of the input files (default='fa') | |
| 60 self._prgAcronym = "" # acronym of the program to launch | |
| 61 self._configFile = "" # name of the configuration file (connect to MySQL) | |
| 62 self._currentDir = os.getcwd() # path to the current directory | |
| 63 self._tmpDir = "" # path to the temporary directory | |
| 64 self._jobTable = "jobs" # name of the table recording the jobs | |
| 65 self._catOutFiles = False # concatenate output files of all jobs | |
| 66 self._clean = False # clean job file, job stdout, job table... | |
| 67 self._verbose = 1 # verbosity level | |
| 68 self.jobdb = None # RepetJob instance | |
| 69 self.job = Job() # Job instance | |
| 70 | |
| 71 self._nbJobs = 0 | |
| 72 self._cmdLineGenericOptions = "hi:Q:g:S:a:C:d:j:Zcv:" | |
| 73 self._cmdLineSpecificOptions = "" | |
| 74 | |
| 75 self._exeWrapper = "AbstractProgramLauncher.py" | |
| 76 self._prgLauncher = None | |
| 77 # list of instances derived from AbstractProgramLauncher() | |
| 78 # If several program are launched successively in the same job, | |
| 79 # 'lPrgLaunchers' has to be filled before run(). | |
| 80 self.lPrgLaunchers = [] | |
| 81 | |
| 82 def setProgramLauncherAttributeFromCmdLine(self, o, a=""): | |
| 83 self.getProgramLauncherInstance().setASpecificAttributeFromCmdLine(o, a) | |
| 84 | |
| 85 def setClusterLauncherAttributeFromCmdLine(self, o, a=""): | |
| 86 if o == "-h": | |
| 87 print self.getHelpAsString() | |
| 88 sys.exit(0) | |
| 89 elif o == "-i": | |
| 90 self.setInputDirectory(a) | |
| 91 elif o == "-Q": | |
| 92 self.setQueueName(a) | |
| 93 elif o == "-g": | |
| 94 self.setGroupIdentifier(a) | |
| 95 elif o == "-S": | |
| 96 self.setInputFileSuffix(a) | |
| 97 elif o == "-a": | |
| 98 self.setAcronym(a) | |
| 99 elif o == "-C": | |
| 100 self.setConfigFile(a) | |
| 101 elif o == "-d": | |
| 102 self.setTemporaryDirectory(a) | |
| 103 elif o == "-j": | |
| 104 self.setJobTableName(a) | |
| 105 elif o == "-Z": | |
| 106 self.setCatOutputFiles() | |
| 107 elif o == "-c": | |
| 108 self.setClean() | |
| 109 elif o == "-v": | |
| 110 self.setVerbosityLevel(a) | |
| 111 | |
| 112 def setAttributesFromCmdLine(self): | |
| 113 try: | |
| 114 opts, args = getopt.getopt(sys.argv[1:], self.getCmdLineOptions()) | |
| 115 except getopt.GetoptError, err: | |
| 116 print str(err); | |
| 117 print self.getHelpAsString() | |
| 118 sys.exit(1) | |
| 119 for o, a in opts: | |
| 120 self.setClusterLauncherAttributeFromCmdLine(o, a) | |
| 121 self.setProgramLauncherAttributeFromCmdLine(o, a) | |
| 122 | |
| 123 def setAGenericAttributeFromCmdLine( self, o, a="" ): | |
| 124 self.setClusterLauncherAttributeFromCmdLine(o, a) | |
| 125 | |
| 126 def setASpecificAttributeFromCmdLine( self, o, a="" ): | |
| 127 self.setProgramLauncherAttributeFromCmdLine(o, a) | |
| 128 | |
| 129 def setInputDirectory( self, arg ): | |
| 130 self._inputDir = arg | |
| 131 | |
| 132 def setQueueName( self, arg ): | |
| 133 self._queueName = arg | |
| 134 | |
| 135 def setGroupIdentifier( self, arg ): | |
| 136 self._groupId = arg | |
| 137 | |
| 138 def setInputFileSuffix( self, arg ): | |
| 139 self._inputFileSuffix = arg | |
| 140 | |
| 141 def setAcronym( self, arg ): | |
| 142 self._prgAcronym = arg | |
| 143 | |
| 144 def setConfigFile( self, arg ): | |
| 145 if os.path.dirname( arg ) == "": | |
| 146 self._configFile = "%s/%s" % ( os.getcwd(), arg ) | |
| 147 else: | |
| 148 self._configFile = arg | |
| 149 | |
| 150 def setCurrentDirectory( self, arg =os.getcwd()): | |
| 151 self._currentDir = arg | |
| 152 | |
| 153 def setTemporaryDirectory( self, arg ): | |
| 154 self._tmpDir = arg | |
| 155 | |
| 156 def setJobTableName( self, arg ): | |
| 157 self._jobTable = arg | |
| 158 | |
| 159 def setCatOutputFiles( self ): | |
| 160 self._catOutFiles = True | |
| 161 | |
| 162 def setClean( self): | |
| 163 self._clean = True | |
| 164 | |
| 165 def setVerbosityLevel( self, arg ): | |
| 166 self._verbose = int(arg) | |
| 167 | |
| 168 def setExecutableWrapper( self, arg = "AbstractProgramLauncher.py" ): | |
| 169 self._exeWrapper = arg | |
| 170 | |
| 171 def setSingleProgramLauncher( self ): | |
| 172 """ | |
| 173 Set the wrapper and program command-lines of the program launcher. | |
| 174 Append the program launcher to 'self.lPrgLaunchers'. | |
| 175 """ | |
| 176 self.getProgramLauncherInstance().setWrapperCommandLine() | |
| 177 self.getProgramLauncherInstance().setProgramCommandLine() | |
| 178 self.lPrgLaunchers.append( self.getProgramLauncherInstance() ) | |
| 179 | |
| 180 def getGenericHelpAsString( self ): | |
| 181 string = "" | |
| 182 string += "usage: %s.py [options]" % (type(self).__name__ ) | |
| 183 string += "\ngeneric options:" | |
| 184 string += "\n -h: this help" | |
| 185 string += "\n -i: directory with input files (absolute path)" | |
| 186 string += "\n -Q: name of the queue on the cluster" | |
| 187 string += "\n -g: identifier of the group of jobs (groupid)" | |
| 188 string += "\n -S: suffix of the input files (default='fa')" | |
| 189 string += "\n -a: acronym of the program to be launched (default='%s')" % ( self.getAcronym() ) | |
| 190 string += "\n -C: configuration file to connect to MySQL (absolute path or in current dir)" | |
| 191 string += "\n -d: temporary directory (absolute path, default=None)" | |
| 192 string += "\n -j: table recording the jobs (default='jobs')" | |
| 193 string += "\n -c: clean the temporary data" | |
| 194 string += "\n -v: verbosity level (default=0/1/2)" | |
| 195 return string | |
| 196 | |
| 197 def getSpecificHelpAsString( self ): | |
| 198 pass | |
| 199 | |
| 200 def getHelpAsString(self): | |
| 201 return self.getGenericHelpAsString() + self.getSpecificHelpAsString() | |
| 202 | |
| 203 def getInputDirectory( self ): | |
| 204 return self._inputDir | |
| 205 | |
| 206 def getQueueName( self ): | |
| 207 return self._queueName | |
| 208 | |
| 209 def getGroupIdentifier( self ): | |
| 210 return self._groupId | |
| 211 | |
| 212 def getInputFileSuffix( self ): | |
| 213 return self._inputFileSuffix | |
| 214 | |
| 215 def getAcronym( self ): | |
| 216 return self._prgAcronym | |
| 217 | |
| 218 def getConfigFile( self ): | |
| 219 return self._configFile | |
| 220 | |
| 221 def getCurrentDirectory( self ): | |
| 222 return self._currentDir | |
| 223 | |
| 224 def getTemporaryDirectory( self ): | |
| 225 return self._tmpDir | |
| 226 | |
| 227 def getJobTableName( self ): | |
| 228 return self._jobTable | |
| 229 | |
| 230 def getCatOutputFiles( self ): | |
| 231 return self._catOutFiles | |
| 232 | |
| 233 def getClean( self ): | |
| 234 return self._clean | |
| 235 | |
| 236 def getVerbosityLevel( self ): | |
| 237 return self._verbose | |
| 238 | |
| 239 def getWrapperName( self ): | |
| 240 return self.getProgramLauncherInstance().getWrapperName() | |
| 241 | |
| 242 def getProgramName( self ): | |
| 243 return self.getProgramLauncherInstance().getProgramName() | |
| 244 | |
| 245 def getPatternToConcatenate( self ): | |
| 246 return self.getProgramLauncherInstance().getOutputFile().replace( GENERIC_IN_FILE, "*" ) | |
| 247 | |
| 248 def getProgramLauncherInstance( self ): | |
| 249 if self._prgLauncher == None: | |
| 250 self._prgLauncher = AbstractProgramLauncher() | |
| 251 return self._prgLauncher | |
| 252 | |
| 253 def getInputFilesList(self): | |
| 254 lInFiles = glob.glob("%s/*.%s" % (self._inputDir, self._inputFileSuffix)) | |
| 255 return lInFiles | |
| 256 | |
| 257 def getCmdLineOptions(self): | |
| 258 return "hi:Q:g:S:a:C:d:j:Zcv:" | |
| 259 | |
| 260 | |
| 261 def getProgramCommandLineAsString( self ): | |
| 262 """ | |
| 263 Return the command-line to launch in each job. | |
| 264 Specified in each wrapper. | |
| 265 """ | |
| 266 pass | |
| 267 | |
| 268 | |
| 269 def getListFilesToKeep( self ): | |
| 270 """ | |
| 271 Return the list of files to keep at the end of each job. | |
| 272 Specified in each wrapper. | |
| 273 """ | |
| 274 pass | |
| 275 | |
| 276 | |
| 277 def getListFilesToRemove( self ): | |
| 278 """ | |
| 279 Return the list of files to remove at the end of each job. | |
| 280 Specified in each wrapper. | |
| 281 """ | |
| 282 pass | |
| 283 | |
| 284 | |
| 285 def getJobFileNameAsString( self, count ): | |
| 286 """ | |
| 287 Return the name of the job file as a string. | |
| 288 @param count: job number (e.g. '1') or '*' | |
| 289 @type count: integer or string | |
| 290 """ | |
| 291 jobFile = "ClusterLauncher" | |
| 292 jobFile += "_groupid%s" % ( self.getGroupIdentifier() ) | |
| 293 if count != "*": | |
| 294 jobFile += "_job%i" % ( count ) | |
| 295 jobFile += "_time%s" % ( time.strftime("%Y-%m-%d-%H-%M-%S") ) | |
| 296 else: | |
| 297 jobFile += "_job*" | |
| 298 jobFile += "_time%s-*" % ( time.strftime("%Y-%m") ) | |
| 299 jobFile += ".py" | |
| 300 return jobFile | |
| 301 | |
| 302 | |
| 303 def getCmdUpdateJobStatusAsString( self, newStatus ): | |
| 304 """ | |
| 305 Return the command to update the job status in the table. | |
| 306 """ | |
| 307 prg = os.environ["REPET_PATH"] + "/bin/srptChangeJobStatus.py" | |
| 308 cmd = prg | |
| 309 cmd += " -t %s" % ( self.job.tablename ) | |
| 310 if str(self.job.jobid).isdigit(): | |
| 311 cmd += " -j %s" % ( self.job.jobname ) | |
| 312 else: | |
| 313 cmd += " -j %s" % ( self.job.jobid ) | |
| 314 cmd += " -g %s" % ( self.job.groupid ) | |
| 315 if self.job.queue != "": | |
| 316 cmd += " -q %s" % ( self.job.queue ) | |
| 317 cmd += " -s %s" % ( newStatus ) | |
| 318 cmd += " -c %s" % ( self.getConfigFile() ) | |
| 319 cmd += " -v %i" % ( self._verbose ) | |
| 320 return "os.system( \"%s\" )\n" % ( cmd ) | |
| 321 | |
| 322 | |
| 323 def getCmdToLaunchWrapper( self, fileName, genericCmd, exeWrapper ): | |
| 324 """ | |
| 325 Return the launching command as a string. | |
| 326 Launch the wrapper, retrieve its exit status, update status if error. | |
| 327 """ | |
| 328 specificCmd = genericCmd.replace( GENERIC_IN_FILE, fileName ) | |
| 329 cmd = "" | |
| 330 cmd += "print \"LAUNCH: %s\"\n" % ( specificCmd ) | |
| 331 cmd += "sys.stdout.flush()\n" | |
| 332 cmd += "exitStatus = os.system ( \"%s\" )\n" % ( specificCmd ) | |
| 333 cmd += "if exitStatus != 0:\n" | |
| 334 cmd += "\tprint \"ERROR: wrapper '%s'" % ( exeWrapper ) | |
| 335 cmd += " returned exit status '%i'\" % ( exitStatus )\n" | |
| 336 cmd += "\tos.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
| 337 cmd += "\tshutil.move( newDir, '%s' )\n" % ( self.getCurrentDirectory() ) | |
| 338 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
| 339 cmd += "\tsys.exit(1)\n" | |
| 340 return cmd | |
| 341 | |
| 342 | |
| 343 def getCmdToKeepFiles( self, fileName, lFilesToKeep ): | |
| 344 """ | |
| 345 Return the commands to keep the output files. | |
| 346 """ | |
| 347 cmd = "" | |
| 348 for f in lFilesToKeep: | |
| 349 f = f.replace( GENERIC_IN_FILE, fileName ) | |
| 350 cmd += "if not os.path.exists( \"%s\" ):\n" % ( f ) | |
| 351 cmd += "\tprint \"ERROR: output file '%s' doesn't exist\"\n" % ( f ) | |
| 352 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
| 353 cmd += "\tsys.exit(1)\n" | |
| 354 cmd += "if not os.path.exists( \"%s/%s\" ):\n" \ | |
| 355 % ( self._currentDir, f ) | |
| 356 cmd += "\tshutil.copy( \"%s\", \"%s/%s\" )\n" % ( f, self.getCurrentDirectory(), f ) | |
| 357 return cmd | |
| 358 | |
| 359 | |
| 360 def getCmdToRemoveFiles( self, fileName, lFilesToRemove ): | |
| 361 """ | |
| 362 Return the commands to remove the temporary files. | |
| 363 """ | |
| 364 cmd = "" | |
| 365 if lFilesToRemove != []: | |
| 366 for f in lFilesToRemove: | |
| 367 f = f.replace( GENERIC_IN_FILE, fileName ) | |
| 368 cmd += "if os.path.exists( \"%s\" ):\n" % ( f ) | |
| 369 cmd += "\tos.remove( \"%s\" )\n" % ( f ) | |
| 370 return cmd | |
| 371 | |
| 372 | |
| 373 def getJobCommandsAsString( self, fileName, jobName, minFreeGigaInTmpDir=1 ): | |
| 374 """ | |
| 375 Return all the job commands as a string. | |
| 376 """ | |
| 377 cmd = "#!/usr/bin/env python\n" | |
| 378 cmd += "\n" | |
| 379 cmd += "import os\n" | |
| 380 cmd += "import sys\n" | |
| 381 cmd += "import shutil\n" | |
| 382 cmd += "import time\n" | |
| 383 cmd += "\n" | |
| 384 cmd += "print \"system:\", os.uname()\n" | |
| 385 cmd += "beginTime = time.time()\n" | |
| 386 cmd += "print 'beginTime=%f' % ( beginTime )\n" | |
| 387 cmd += "\n" | |
| 388 cmd += self.getCmdUpdateJobStatusAsString( "running" ) | |
| 389 cmd += "\n" | |
| 390 cmd += "if not os.path.exists( \"%s\" ):\n" % ( self.getTemporaryDirectory() ) | |
| 391 cmd += "\tprint \"ERROR: working dir '%s' doesn't exist\"\n" % ( \ | |
| 392 self.getTemporaryDirectory() ) | |
| 393 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
| 394 cmd += "\tsys.exit(1)\n" | |
| 395 cmd += "freeSpace = os.statvfs( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
| 396 cmd += "if ( freeSpace.f_bavail * freeSpace.f_frsize ) / 1073741824.0 < %i:\n" % ( minFreeGigaInTmpDir ) # nb blocs * bloc size in bytes > 1 GigaByte ? | |
| 397 cmd += "\tprint \"ERROR: less than %iGb in '%s'\"\n" % ( minFreeGigaInTmpDir, self.getTemporaryDirectory() ) | |
| 398 cmd += "\t%s" % ( self.getCmdUpdateJobStatusAsString( "error" ) ) | |
| 399 cmd += "\tsys.exit(1)\n" | |
| 400 cmd += "print \"working dir: %s\"\n" % ( self.getTemporaryDirectory() ) | |
| 401 cmd += "sys.stdout.flush()\n" | |
| 402 cmd += "os.chdir( \"%s\" )\n" % ( self.getTemporaryDirectory() ) | |
| 403 cmd += "\n" | |
| 404 cmd += "newDir = \"%s_%s_%s\"\n" % ( self.getGroupIdentifier(), jobName, time.strftime("%Y%m%d-%H%M%S") ) | |
| 405 cmd += "if os.path.exists( newDir ):\n" | |
| 406 cmd += "\tshutil.rmtree( newDir )\n" | |
| 407 cmd += "os.mkdir( newDir )\n" | |
| 408 cmd += "os.chdir( newDir )\n" | |
| 409 cmd += "\n" | |
| 410 cmd += "if not os.path.exists( \"%s\" ):\n" % ( fileName ) | |
| 411 cmd += "\tos.symlink( \"%s/%s\", \"%s\" )\n" % \ | |
| 412 ( self._inputDir, fileName, fileName ) | |
| 413 cmd += "\n" | |
| 414 | |
| 415 for pL in self.lPrgLaunchers: | |
| 416 cmd += self.getCmdToLaunchWrapper( \ | |
| 417 fileName, \ | |
| 418 pL.getWrapperCommandLine(), \ | |
| 419 "%s.py" % ( type(pL).__name__ ) ) | |
| 420 cmd += "\n" | |
| 421 cmd += self.getCmdToKeepFiles( fileName, pL.getListFilesToKeep() ) | |
| 422 cmd += "\n" | |
| 423 cmd += self.getCmdToRemoveFiles( fileName, \ | |
| 424 pL.getListFilesToRemove() ) | |
| 425 | |
| 426 cmd += "if os.path.exists( \"%s\" ):\n" % ( fileName ) | |
| 427 cmd += "\tos.remove( \"%s\" )\n" % ( fileName ) | |
| 428 cmd += "os.chdir( \"..\" )\n" | |
| 429 cmd += "shutil.rmtree( newDir )\n" | |
| 430 cmd += self.getCmdUpdateJobStatusAsString( "finished" ) | |
| 431 cmd += "\n" | |
| 432 cmd += "endTime = time.time()\n" | |
| 433 cmd += "print 'endTime=%f' % ( endTime)\n" | |
| 434 cmd += "print 'executionTime=%f' % ( endTime - beginTime )\n" | |
| 435 cmd += "print \"system:\", os.uname()\n" | |
| 436 cmd += "sys.exit(0)\n" | |
| 437 return cmd | |
| 438 | |
| 439 def getStatsOfExecutionTime( self ): | |
| 440 """ | |
| 441 Return a Stat object about the execution time of each job as a | |
| 442 float expressed in seconds since the epoch, in UTC. | |
| 443 """ | |
| 444 stat = Stat() | |
| 445 pattern = "%s/%s*.o*" % ( self.getCurrentDirectory(), \ | |
| 446 self.getAcronym() ) | |
| 447 lJobFiles = glob.glob( pattern ) | |
| 448 for f in lJobFiles: | |
| 449 fH = open( f, "r" ) | |
| 450 while True: | |
| 451 line = fH.readline() | |
| 452 if line == "": | |
| 453 break | |
| 454 if "executionTime" in line: | |
| 455 stat.add( float(line[:-1].split("=")[1] ) ) | |
| 456 break | |
| 457 fH.close() | |
| 458 return stat | |
| 459 | |
| 460 | |
| 461 def formatGroupidAndTime(self): | |
| 462 return self.job.groupid + " " + time.strftime("%Y-%m-%d %H:%M:%S") + "" | |
| 463 | |
| 464 def submitJob(self, lInFiles): | |
| 465 count = 0 | |
| 466 for inFile in lInFiles: | |
| 467 count += 1 | |
| 468 fileName = os.path.basename(inFile) | |
| 469 if self._verbose > 1: | |
| 470 print "processing '%s' # %i..." % (fileName, count) | |
| 471 sys.stdout.flush() | |
| 472 | |
| 473 self.initializeJob(fileName, count) | |
| 474 time.sleep(0.5) | |
| 475 exitStatus = self.jobdb.submitJob(self.job) | |
| 476 if exitStatus != 0: | |
| 477 print "ERROR while submitting job '%i' to the cluster" % (count) | |
| 478 sys.exit(1) | |
| 479 | |
| 480 def checkClusterLauncherAttributes( self ): | |
| 481 if self.getInputDirectory() == "": | |
| 482 message = "ERROR: missing input directory" | |
| 483 raise CheckerException(message) | |
| 484 if not os.path.exists( self.getInputDirectory() ): | |
| 485 message = "ERROR: input directory '%s' doesn't exist" % ( self.getInputDirectory() ) | |
| 486 raise CheckerException(message) | |
| 487 if self.getGroupIdentifier() == "": | |
| 488 message = "ERROR: missing group identifier" | |
| 489 raise CheckerException(message) | |
| 490 if self.getAcronym() == "": | |
| 491 message = "ERROR: missing program acronym" | |
| 492 raise CheckerException(message) | |
| 493 if self.getConfigFile() == "": | |
| 494 message = "ERROR: missing config file to access MySQL" | |
| 495 raise CheckerException(message) | |
| 496 if not os.path.exists( self.getConfigFile() ): | |
| 497 message = "ERROR: config file '%s' doesn't exist" % ( self.getConfigFile() ) | |
| 498 raise CheckerException(message) | |
| 499 if self.getTemporaryDirectory() == "": | |
| 500 self.setTemporaryDirectory(self._currentDir) | |
| 501 | |
| 502 def checkGenericAttributes( self ): | |
| 503 self.checkClusterLauncherAttributes() | |
| 504 | |
| 505 def checkProgramLauncherAttributes( self ): | |
| 506 self.getProgramLauncherInstance().checkSpecificAttributes() | |
| 507 | |
| 508 def checkSpecificAttributes( self ): | |
| 509 self.checkProgramLauncherAttributes() | |
| 510 | |
| 511 def start( self ): | |
| 512 | |
| 513 if self.lPrgLaunchers == []: | |
| 514 self.setSingleProgramLauncher() | |
| 515 for pL in self.lPrgLaunchers: | |
| 516 if pL.getWrapperCommandLine() == "": | |
| 517 string = "ERROR: wrapper command is empty !" | |
| 518 print string | |
| 519 sys.exit(1) | |
| 520 if pL.getProgramCommandLine() == "": | |
| 521 string = "ERROR: program command is empty !" | |
| 522 print string | |
| 523 sys.exit(1) | |
| 524 self.checkProgramAvailability() | |
| 525 | |
| 526 try: | |
| 527 self.checkProgramLauncherAttributes() | |
| 528 except CheckerException, msg: | |
| 529 print msg | |
| 530 print self.getHelpAsString() | |
| 531 sys.exit(1) | |
| 532 | |
| 533 if self.getVerbosityLevel() > 0: | |
| 534 string = "START %s" % ( type(self).__name__ ) | |
| 535 print string | |
| 536 self.job.tablename = self.getJobTableName() | |
| 537 self.job.groupid = self.getGroupIdentifier() | |
| 538 tokens = self.getQueueName().replace("'","").split(" ") | |
| 539 self.job.setQueue( tokens[0] ) | |
| 540 if len(tokens) > 1: | |
| 541 lResources = tokens[1:] | |
| 542 self.job.lResources = lResources | |
| 543 if self.getVerbosityLevel() > 0: | |
| 544 print "groupid: %s" % ( self.getGroupIdentifier() ) | |
| 545 self.jobdb = RepetJob( cfgFileName=self.getConfigFile() ) | |
| 546 if self.jobdb.hasUnfinishedJob( self.job.tablename, \ | |
| 547 self.job.groupid ): | |
| 548 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) | |
| 549 return | |
| 550 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) | |
| 551 sys.stdout.flush() | |
| 552 | |
| 553 def end( self ): | |
| 554 if self.getClean(): | |
| 555 self.removeAllJobFiles() | |
| 556 self.removeAllJobStdouts() | |
| 557 self.removeAllJobStderrs() | |
| 558 | |
| 559 if self.getCatOutputFiles(): | |
| 560 self.catOutputFiles() | |
| 561 | |
| 562 self.jobdb.close() | |
| 563 | |
| 564 if self.getVerbosityLevel() > 0: | |
| 565 string = "END %s" % ( type(self).__name__ ) | |
| 566 print string | |
| 567 sys.stdout.flush() | |
| 568 | |
| 569 def run( self ): | |
| 570 try: | |
| 571 self.checkClusterLauncherAttributes() | |
| 572 except CheckerException, msg: | |
| 573 print msg | |
| 574 print self.getHelpAsString() | |
| 575 sys.exit(1) | |
| 576 | |
| 577 self.start() | |
| 578 | |
| 579 lInFiles = self.getInputFilesList() | |
| 580 self._nbJobs = len(lInFiles) | |
| 581 | |
| 582 if self._verbose > 0: | |
| 583 string = "submitting " + str(self._nbJobs) + " jobs... " + self.formatGroupidAndTime() | |
| 584 print string; sys.stdout.flush() | |
| 585 | |
| 586 self.submitJob(lInFiles) | |
| 587 | |
| 588 if self._verbose > 0: | |
| 589 string = "waiting for jobs... " + self.formatGroupidAndTime() | |
| 590 print string; sys.stdout.flush() | |
| 591 | |
| 592 self.jobdb.waitJobGroup( self.job.tablename, self.job.groupid ) | |
| 593 | |
| 594 if self._verbose > 0: | |
| 595 string = "all jobs completed ! " + self.formatGroupidAndTime() | |
| 596 print string; sys.stdout.flush() | |
| 597 statsExecutionTime = self.getStatsOfExecutionTime() | |
| 598 print "execution time of all jobs (seconds): %f" % statsExecutionTime.getSum() | |
| 599 print "execution time per job: %s" % statsExecutionTime.string() | |
| 600 sys.stdout.flush() | |
| 601 | |
| 602 self.jobdb.cleanJobGroup( self.job.tablename, self.job.groupid ) | |
| 603 | |
| 604 self.end() | |
| 605 | |
| 606 |
