diff commons/core/sql/JobAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/JobAdaptator.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,271 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import os
+import time
+import sys
+import tempfile
+import subprocess
+from commons.core.sql.Job import Job
+
+## Methods for Job persistence 
+#
+class JobAdaptator(object):
+    
+    def __init__(self, lJob = [], table = "" ):
+        self._lJobID = lJob
+        self._table = table
+        self._acronym = ""
+    ## Record a job
+    #
+    # @param job Job instance with the job informations
+    #
+    def recordJob(self, job):
+        self._lJobID.append(job)
+    
+    ## Remove a job from the job table
+    #
+    #  @param job: job instance to remove
+    #
+    def removeJob(self, job):
+        pass         
+            
+    ## Set the jobid of a job with the id of SGE
+    #
+    # @param job job instance
+    # @param jobid integer
+    #
+    def updateJobIdInDB(self, job, jobid):
+        pass
+        
+    ## Get a job status
+    #
+    # @param job: a Job instance with the job informations
+    #
+    def getJobStatus(self, job):
+        pass
+    
+    
+    ## Change a job status
+    #
+    # @param job: a Job instance with the job informations
+    # @param status: the new status (waiting,finished,error)
+    #
+    def changeJobStatus(self, job, status):
+        pass
+        
+    ## Get the number of jobs belonging to the desired groupid with the desired status.
+    #
+    # @param groupid string a group identifier to record related job series 
+    # @param status string job status (waiting, running, finished, error)
+    # @return int
+    #
+    def getCountStatus(self, groupid, status):
+        pass
+        
+    ## Clean all job from a job group
+    #
+    # @param groupid: a group identifier to record related job series
+    #
+    def cleanJobGroup(self, groupid):
+        pass            
+            
+    ## Check if there is unfinished job from a job group.
+    #
+    # @param groupid string a group identifier to record related job series 
+    #        
+    def hasUnfinishedJob(self, groupid):
+        pass
+
+    def _getJobIDListFromQstat(self):
+        lJobIDFromQstat = []
+        tmp = tempfile.NamedTemporaryFile(delete=False)
+        cmd ="qstat | grep %s" % self._acronym
+        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
+        process.communicate()
+        tmp.close()
+        if process.returncode == 0:
+            fileName = tmp.name
+            jobidFileHandler = open(fileName, "r")        
+            for line in jobidFileHandler:
+                line2 = line.lstrip(" ")
+                lJobIDFromQstat.append(line2.split(" ")[0])
+            jobidFileHandler.close()
+            os.remove(fileName)
+        return lJobIDFromQstat     
+     
+    def _areJobsStillRunning(self,lJobID,lJobIDFromQstat):
+        sorted(lJobID)  
+        sorted(lJobIDFromQstat)
+        for i in lJobID:
+            for j in lJobIDFromQstat:
+                if int(i)== int(j):
+                    return True
+        return False
+                
+        
+    ## Wait job finished status from a job group.
+    #  Job are re-launched if error (max. 3 times)
+    #
+    # @param groupid string a group identifier to record related job series
+    # @param checkInterval integer time laps in seconds between two checks (default = 5)
+    # @param maxRelaunch integer max nb of times a job in error is relaunch before exiting (default = 3)
+    # @param exitIfTooManyErrors boolean exit if a job is still in error above maxRelaunch (default = True)
+    # @param timeOutPerJob integer max nb of seconds after which one tests if a job is still in SGE or not (default = 60*60=1h)
+    #
+    def waitJobGroup(self, groupid, checkInterval=5, maxRelaunch=3, exitIfTooManyErrors=True, timeOutPerJob=60*60):
+        
+        while True:
+            time.sleep(checkInterval)
+            lJobIDFromQstat = self._getJobIDListFromQstat()
+            if self._areJobsStillRunning(self._lJobID, lJobIDFromQstat) == False:
+                break
+    
+    ## Submit a job to a queue and record it in job table.
+    #
+    # @param job a job instance
+    # @param maxNbWaitingJobs integer max nb of waiting jobs before submitting a new one (default = 10000)
+    # @param checkInterval integer time laps in seconds between two checks (default = 30)
+    # @param verbose integer (default = 0)
+    #               
+    def submitJob(self, job, verbose=0, maxNbWaitingJobs=10000, checkInterval=30):
+        cmd = self._getQsubCommand(job)
+        tmp = tempfile.NamedTemporaryFile(delete=False)
+        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
+        process.communicate()
+        tmp.close()
+        if process.returncode == 0:
+            fileName = tmp.name
+            jobidFileHandler = open(fileName, "r")
+            jobid = self._getJobidFromJobManager(jobidFileHandler)
+            if verbose > 0:
+                print "job '%i %s' submitted" % (jobid, job.jobname)
+                sys.stdout.flush()
+            job.jobid = jobid
+            #newJob= Job(job.jobid, job.jobname, job.groupid, job.queue, job.command, job.launcher, job.node, job.lResources, job.parallelEnvironment)
+            self._acronym = job.jobname.split("_")[0][:10]
+            self.recordJob(job.jobid)
+            jobidFileHandler.close()
+            os.remove(fileName)
+        return process.returncode
+
+
+    ## Get the list of nodes where jobs of one group were executed
+    #
+    # @param groupid string a group identifier of job series 
+    # @return lNodes list of nodes names without redundancy
+    #
+    def getNodesListByGroupId(self, groupId):
+        pass
+    
+    def checkJobTable(self):
+        pass
+    
+    def close(self):
+        pass
+    
+    def _getJobidAndNbJob(self, jobid) :
+        tab = jobid.split(".")
+        jobid = tab[0]
+        tab = tab[1].split(":")
+        nbJob = tab[0]
+        return jobid, nbJob
+    
+class JobAdaptatorSGE(JobAdaptator):
+
+   ## Check if a job is still handled by SGE
+    #
+    # @param jobid string job identifier
+    # @param jobname string job name
+    #  
+    def isJobStillHandledBySge(self, jobid, jobname):
+        isJobInQstat = False
+        tmp = tempfile.NamedTemporaryFile(delete=False)
+        cmd = "qstat"
+        process = subprocess.Popen(cmd, shell=True,stdout=tmp)
+        process.communicate()
+        tmp.close()
+        qstatFile = tmp.name
+        if process.returncode  != 0:
+            msg = "ERROR while launching 'qstat'"
+            sys.stderr.write( "%s\n" % msg )
+            sys.exit(1)
+        qstatFileHandler = open(qstatFile, "r")
+        lLines = qstatFileHandler.readlines()
+        for line in lLines:
+            tokens = line.split()
+            if len(tokens) > 3 and tokens[0] == str(jobid) and tokens[2] == jobname[0:len(tokens[2])]:
+                isJobInQstat = True
+                break
+        qstatFileHandler.close()
+        os.remove(qstatFile)
+        return isJobInQstat
+    
+    def _getQsubCommand(self, job):    
+        cmd = "echo '%s' | " % job.launcher
+        prg = "qsub"
+        cmd += prg
+        cmd += " -V"
+        cmd += " -N %s" % job.jobname
+        if job.queue != "":
+            cmd += " -q %s" % job.queue
+        cmd += " -cwd"
+        if job.lResources != []:
+            cmd += " -l \""
+            cmd += " ".join(job.lResources)
+            cmd += "\""
+        if job.parallelEnvironment != "":
+            cmd += " -pe " + job.parallelEnvironment
+        return cmd
+    
+    def _getJobidFromJobManager(self, jobidFileHandler):
+        return int(jobidFileHandler.readline().split(" ")[2])
+    
+
+class JobAdaptatorTorque(JobAdaptator):  
+        
+    def _getQsubCommand(self, job):    
+        cmd = "echo '%s' | " % job.launcher
+        prg = "qsub"
+        cmd += prg
+        cmd += " -V"
+        cmd += " -d %s" % os.getcwd()
+        cmd += " -N %s" % job.jobname
+        if job.queue != "":
+            cmd += " -q %s" % job.queue
+        if job.lResources != []:
+            cmd += " -l \""
+            cmd += " ".join(job.lResources).replace("mem_free","mem")
+            cmd += "\""
+        return cmd
+
+    def _getJobidFromJobManager(self, jobidFileHandler):
+        return int(jobidFileHandler.readline().split(".")[0])