Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/JobAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/JobAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,271 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import os +import time +import sys +import tempfile +import subprocess +from commons.core.sql.Job import Job + +## Methods for Job persistence +# +class JobAdaptator(object): + + def __init__(self, lJob = [], table = "" ): + self._lJobID = lJob + self._table = table + self._acronym = "" + ## Record a job + # + # @param job Job instance with the job informations + # + def recordJob(self, job): + self._lJobID.append(job) + + ## Remove a job from the job table + # + # @param job: job instance to remove + # + def removeJob(self, job): + pass + + ## Set the jobid of a job with the id of SGE + # + # @param job job instance + # @param jobid integer + # + def updateJobIdInDB(self, job, jobid): + pass + + ## Get a job status + # + # @param job: a Job instance with the job informations + # + def getJobStatus(self, job): + pass + + + ## Change a job status + # + # @param job: a Job instance with the job informations + # @param status: the new status (waiting,finished,error) + # + def changeJobStatus(self, job, status): + pass + + ## Get the number of jobs belonging to the desired groupid with the desired status. + # + # @param groupid string a group identifier to record related job series + # @param status string job status (waiting, running, finished, error) + # @return int + # + def getCountStatus(self, groupid, status): + pass + + ## Clean all job from a job group + # + # @param groupid: a group identifier to record related job series + # + def cleanJobGroup(self, groupid): + pass + + ## Check if there is unfinished job from a job group. + # + # @param groupid string a group identifier to record related job series + # + def hasUnfinishedJob(self, groupid): + pass + + def _getJobIDListFromQstat(self): + lJobIDFromQstat = [] + tmp = tempfile.NamedTemporaryFile(delete=False) + cmd ="qstat | grep %s" % self._acronym + process = subprocess.Popen(cmd, shell=True,stdout=tmp) + process.communicate() + tmp.close() + if process.returncode == 0: + fileName = tmp.name + jobidFileHandler = open(fileName, "r") + for line in jobidFileHandler: + line2 = line.lstrip(" ") + lJobIDFromQstat.append(line2.split(" ")[0]) + jobidFileHandler.close() + os.remove(fileName) + return lJobIDFromQstat + + def _areJobsStillRunning(self,lJobID,lJobIDFromQstat): + sorted(lJobID) + sorted(lJobIDFromQstat) + for i in lJobID: + for j in lJobIDFromQstat: + if int(i)== int(j): + return True + return False + + + ## Wait job finished status from a job group. + # Job are re-launched if error (max. 3 times) + # + # @param groupid string a group identifier to record related job series + # @param checkInterval integer time laps in seconds between two checks (default = 5) + # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3) + # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True) + # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h) + # + def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60): + + while True: + time.sleep(checkInterval) + lJobIDFromQstat = self._getJobIDListFromQstat() + if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False: + break + + ## Submit a job to a queue and record it in job table. + # + # @param job a job instance + # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000) + # @param checkInterval integer time laps in seconds between two checks (default = 30) + # @param verbose integer (default = 0) + # + def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30): + cmd = self._getQsubCommand(job) + tmp = tempfile.NamedTemporaryFile(delete=False) + process = subprocess.Popen(cmd, shell=True,stdout=tmp) + process.communicate() + tmp.close() + if process.returncode == 0: + fileName = tmp.name + jobidFileHandler = open(fileName, "r") + jobid = self._getJobidFromJobManager(jobidFileHandler) + if verbose > 0: + print "job '%i %s' submitted" % (jobid, job.jobname) + sys.stdout.flush() + job.jobid = jobid + #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment) + self._acronym = job.jobname.split("_")[0][:10] + self.recordJob(job.jobid) + jobidFileHandler.close() + os.remove(fileName) + return process.returncode + + + ## Get the list of nodes where jobs of one group were executed + # + # @param groupid string a group identifier of job series + # @return lNodes list of nodes names without redundancy + # + def getNodesListByGroupId(self, groupId): + pass + + def checkJobTable(self): + pass + + def close(self): + pass + + def _getJobidAndNbJob(self, jobid) : + tab = jobid.split(".") + jobid = tab[0] + tab = tab[1].split(":") + nbJob = tab[0] + return jobid, nbJob + +class JobAdaptatorSGE(JobAdaptator): + + ## Check if a job is still handled by SGE + # + # @param jobid string job identifier + # @param jobname string job name + # + def isJobStillHandledBySge(self, jobid, jobname): + isJobInQstat = False + tmp = tempfile.NamedTemporaryFile(delete=False) + cmd = "qstat" + process = subprocess.Popen(cmd, shell=True,stdout=tmp) + process.communicate() + tmp.close() + qstatFile = tmp.name + if process.returncode != 0: + msg = "ERROR while launching 'qstat'" + sys.stderr.write( "%s\n" % msg ) + sys.exit(1) + qstatFileHandler = open(qstatFile, "r") + lLines = qstatFileHandler.readlines() + for line in lLines: + tokens = line.split() + if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]: + isJobInQstat = True + break + qstatFileHandler.close() + os.remove(qstatFile) + return isJobInQstat + + def _getQsubCommand(self, job): + cmd = "echo '%s' | " % job.launcher + prg = "qsub" + cmd += prg + cmd += " -V" + cmd += " -N %s" % job.jobname + if job.queue != "": + cmd += " -q %s" % job.queue + cmd += " -cwd" + if job.lResources != []: + cmd += " -l \"" + cmd += " ".join(job.lResources) + cmd += "\"" + if job.parallelEnvironment != "": + cmd += " -pe " + job.parallelEnvironment + return cmd + + def _getJobidFromJobManager(self, jobidFileHandler): + return int(jobidFileHandler.readline().split(" ")[2]) + + +class JobAdaptatorTorque(JobAdaptator): + + def _getQsubCommand(self, job): + cmd = "echo '%s' | " % job.launcher + prg = "qsub" + cmd += prg + cmd += " -V" + cmd += " -d %s" % os.getcwd() + cmd += " -N %s" % job.jobname + if job.queue != "": + cmd += " -q %s" % job.queue + if job.lResources != []: + cmd += " -l \"" + cmd += " ".join(job.lResources).replace("mem_free","mem") + cmd += "\"" + return cmd + + def _getJobidFromJobManager(self, jobidFileHandler): + return int(jobidFileHandler.readline().split(".")[0])