Mercurial > repos > yufei-luo > s_mart
view commons/core/sql/DbSQLite.py @ 15:440ceca58672
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 22 Apr 2013 11:08:07 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
import sqlite3 import os import sys #TODO: update...compare with DbMySql.py class DbSQLite(object): ## Constructor # # @param host string db file path # @param cfgFileName string configuration file name # # @note when a parameter is left blank, the constructor is able # to set attribute values from environment variable: REPET_HOST, # def __init__(self, host = ""): if host != "": self.host = host else: msg = "ERROR: no host specified" sys.stderr.write( "%s\n" % msg ) sys.exit(1) # remove open() and cursor from init() use directly outside this class ... self.open() self.cursor = self.db.cursor() ## Connect to the DbSQLite database # # @param verbose integer (default = 0) # def open( self, verbose = 0, nb = 0 ): try: #sqlite.connect(":memory:", check_same_thread = False) self.db = sqlite3.connect(self.host, check_same_thread= False, isolation_level=None, detect_types=sqlite3.PARSE_DECLTYPES) except sqlite3.Error, e: if verbose > 0: print "ERROR %s" % e sys.stdout.flush() return False return True ## Execute a SQL query # # @param qry string SQL query to execute # @param params parameters of SQL query # def execute( self, qry, params=None ): try : if params == None: self.cursor.execute( qry ) else: self.cursor.execute( qry, params ) except Exception, e: #TODO Must be test try : if params == None: self.cursor.execute( qry ) else: self.cursor.execute( qry, params ) except Exception, e: print "Erreur : %s" % e ## Retrieve the results of a SQL query # def fetchall(self): return self.cursor.fetchall() ## Record a new table in the 'info_table' table # # @param tableName string table name # @param info string information on the table origin # def updateInfoTable( self, tableName, info ): if not self.doesTableExist( "info_tables" ): sqlCmd = "CREATE TABLE info_tables ( name varchar(255), file varchar(255) )" self.execute( sqlCmd ) sqlCmd = 'INSERT INTO info_tables VALUES ("%s","%s")' % (tableName, info) self.execute( sqlCmd ) def createTable(self, tableName, dataType, overwrite=False, verbose=0): if verbose > 0: print "creating table '%s' from file '%s' of type '%s'..." % (tableName, dataType) sys.stdout.flush() if overwrite: self.dropTable(tableName) if dataType.lower() in ["job", "jobs"]: self.createJobTable(tableName) else: print "ERROR: unknown type %s" % (dataType) self.close() sys.exit(1) if verbose > 0: print "done!"; sys.stdout.flush() ## Create a job table # # @param tablename new table name # def createJobTable( self, tablename ): sqlCmd = "CREATE TABLE %s" % ( tablename ) sqlCmd += " ( jobid INT UNSIGNED" sqlCmd += ", jobname VARCHAR(255)" sqlCmd += ", groupid VARCHAR(255)" sqlCmd += ", command TEXT" sqlCmd += ", launcher VARCHAR(1024)" sqlCmd += ", queue VARCHAR(255)" sqlCmd += ", status VARCHAR(255)" sqlCmd += ", time timestamp" sqlCmd += ", node VARCHAR(255) )" self.execute( sqlCmd ) self.updateInfoTable( tablename, "job table" ) sqlCmd = "CREATE INDEX igroupid ON " + tablename + " ( groupid )" self.execute( sqlCmd ) ## Test if a table exists # # @param table string table name # @return boolean True if the table exists, False otherwise # def doesTableExist( self, table ): qry = "PRAGMA table_info(%s)" % (table) self.execute( qry ) results = self.cursor.fetchall() if results: return True return False def isEmpty( self, tableName ): return self.getSize( tableName ) == 0 ## Give the rows number of the table # # @param tableName string table name # def getSize( self, tableName ): qry = "SELECT count(*) FROM %s;" % ( tableName ) self.execute( qry ) res = self.fetchall() return int( res[0][0] ) ## Remove a table if it exists # # @param table string table name # @param verbose integer (default = 0) # def dropTable( self, table, verbose = 0 ): if self.doesTableExist( table ): sqlCmd = "DROP TABLE %s" % ( table ) self.execute( sqlCmd ) sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % ( table ) self.execute( sqlCmd ) ## Get a list with the fields # def getFieldList( self, table ): lFields = [] sqlCmd = "PRAGMA table_info(%s)" % ( table ) self.execute( sqlCmd ) lResults = self.fetchall() for res in lResults: lFields.append( res[1] ) return lFields ## delete this SQLite database session # def delete(self): os.remove(self.host) ## Close the connection # def close( self ): self.db.close()