Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/JobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line source
# Copyright INRA (Institut National de la Recherche Agronomique) # http://www.inra.fr # http://urgi.versailles.inra.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. import os import time import sys import tempfile import subprocess from commons.core.sql.Job import Job ## Methods for Job persistence # class JobAdaptator(object): def __init__(self, lJob = [], table = "" ): self._lJobID = lJob self._table = table self._acronym = "" ## Record a job # # @param job Job instance with the job informations # def recordJob(self, job): self._lJobID.append(job) ## Remove a job from the job table # # @param job: job instance to remove # def removeJob(self, job): pass ## Set the jobid of a job with the id of SGE # # @param job job instance # @param jobid integer # def updateJobIdInDB(self, job, jobid): pass ## Get a job status # # @param job: a Job instance with the job informations # def getJobStatus(self, job): pass ## Change a job status # # @param job: a Job instance with the job informations # @param status: the new status (waiting,finished,error) # def changeJobStatus(self, job, status): pass ## Get the number of jobs belonging to the desired groupid with the desired status. # # @param groupid string a group identifier to record related job series # @param status string job status (waiting, running, finished, error) # @return int # def getCountStatus(self, groupid, status): pass ## Clean all job from a job group # # @param groupid: a group identifier to record related job series # def cleanJobGroup(self, groupid): pass ## Check if there is unfinished job from a job group. # # @param groupid string a group identifier to record related job series # def hasUnfinishedJob(self, groupid): pass def _getJobIDListFromQstat(self): lJobIDFromQstat = [] tmp = tempfile.NamedTemporaryFile(delete=False) cmd ="qstat | grep %s" % self._acronym process = subprocess.Popen(cmd, shell=True,stdout=tmp) process.communicate() tmp.close() if process.returncode == 0: fileName = tmp.name jobidFileHandler = open(fileName, "r") for line in jobidFileHandler: line2 = line.lstrip(" ") lJobIDFromQstat.append(line2.split(" ")[0]) jobidFileHandler.close() os.remove(fileName) return lJobIDFromQstat def _areJobsStillRunning(self,lJobID,lJobIDFromQstat): sorted(lJobID) sorted(lJobIDFromQstat) for i in lJobID: for j in lJobIDFromQstat: if int(i)== int(j): return True return False ## Wait job finished status from a job group. # Job are re-launched if error (max. 3 times) # # @param groupid string a group identifier to record related job series # @param checkInterval integer time laps in seconds between two checks (default = 5) # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) # def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): while True: time.sleep(checkInterval) lJobIDFromQstat = self._getJobIDListFromQstat() if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False: break ## Submit a job to a queue and record it in job table. # # @param job a job instance # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) # @param checkInterval integer time laps in seconds between two checks (default = 30) # @param verbose integer (default = 0) # def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30): cmd = self._getQsubCommand(job) tmp = tempfile.NamedTemporaryFile(delete=False) process = subprocess.Popen(cmd, shell=True,stdout=tmp) process.communicate() tmp.close() if process.returncode == 0: fileName = tmp.name jobidFileHandler = open(fileName, "r") jobid = self._getJobidFromJobManager(jobidFileHandler) if verbose > 0: print "job '%i %s' submitted" % (jobid, job.jobname) sys.stdout.flush() job.jobid = jobid #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment) self._acronym = job.jobname.split("_")[0][:10] self.recordJob(job.jobid) jobidFileHandler.close() os.remove(fileName) return process.returncode ## Get the list of nodes where jobs of one group were executed # # @param groupid string a group identifier of job series # @return lNodes list of nodes names without redundancy # def getNodesListByGroupId(self, groupId): pass def checkJobTable(self): pass def close(self): pass def _getJobidAndNbJob(self, jobid) : tab = jobid.split(".") jobid = tab[0] tab = tab[1].split(":") nbJob = tab[0] return jobid, nbJob class JobAdaptatorSGE(JobAdaptator): ## Check if a job is still handled by SGE # # @param jobid string job identifier # @param jobname string job name # def isJobStillHandledBySge(self, jobid, jobname): isJobInQstat = False tmp = tempfile.NamedTemporaryFile(delete=False) cmd = "qstat" process = subprocess.Popen(cmd, shell=True,stdout=tmp) process.communicate() tmp.close() qstatFile = tmp.name if process.returncode != 0: msg = "ERROR while launching 'qstat'" sys.stderr.write( "%s\n" % msg ) sys.exit(1) qstatFileHandler = open(qstatFile, "r") lLines = qstatFileHandler.readlines() for line in lLines: tokens = line.split() if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: isJobInQstat = True break qstatFileHandler.close() os.remove(qstatFile) return isJobInQstat def _getQsubCommand(self, job): cmd = "echo '%s' | " % job.launcher prg = "qsub" cmd += prg cmd += " -V" cmd += " -N %s" % job.jobname if job.queue != "": cmd += " -q %s" % job.queue cmd += " -cwd" if job.lResources != []: cmd += " -l \"" cmd += " ".join(job.lResources) cmd += "\"" if job.parallelEnvironment != "": cmd += " -pe " + job.parallelEnvironment return cmd def _getJobidFromJobManager(self, jobidFileHandler): return int(jobidFileHandler.readline().split(" ")[2]) class JobAdaptatorTorque(JobAdaptator): def _getQsubCommand(self, job): cmd = "echo '%s' | " % job.launcher prg = "qsub" cmd += prg cmd += " -V" cmd += " -d %s" % os.getcwd() cmd += " -N %s" % job.jobname if job.queue != "": cmd += " -q %s" % job.queue if job.lResources != []: cmd += " -l \"" cmd += " ".join(job.lResources).replace("mem_free","mem") cmd += "\"" return cmd def _getJobidFromJobManager(self, jobidFileHandler): return int(jobidFileHandler.readline().split(".")[0])