Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/sql/test/Test_DbMySql.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/sql/test/Test_DbMySql.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,1554 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +import unittest +import time +import os +from MySQLdb import ProgrammingError +from commons.core.sql.DbMySql import DbMySql +from commons.core.sql.DbMySql import TABLE_SCHEMA_DESCRIPTOR +from commons.core.sql.DbMySql import TABLE_TYPE_SYNONYMS +from commons.core.utils.FileUtils import FileUtils +from commons.core.coord.Path import Path + +class Test_DbMySql( unittest.TestCase ): + + def setUp( self ): + self._iDb = DbMySql( ) + self._uniqId = "%s" % time.strftime("%Y%m%d%H%M%S") + + def tearDown( self ): + if self._iDb.db.open: + self._iDb.close() + self._iDb = None + + def test_execute_syntax_error(self): + expErrorMsg = "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'CHAUD TABLES' at line 1" + obsErrorMsg = "" + sqlCmd = "CHAUD TABLES" + try: + self._iDb.execute(sqlCmd) + except ProgrammingError as excep: + obsErrorMsg = excep.args[1] + + self.assertEquals(expErrorMsg, obsErrorMsg) + + def test_execute_with_1_retry(self): + tableName = "dummyTable%s" % self._uniqId + sqlCmd = "CREATE TABLE %s (dummyColumn varchar(255))" % tableName + self._iDb.close() + self._iDb.execute(sqlCmd) + self.assertTrue(self._iDb.doesTableExist(tableName)) + self._iDb.dropTable(tableName) + + def test_setAttributesFromConfigFile(self): + expHost = "dummyHost" + expUser = "dummyUser" + expPw = "dummyPw" + expDb = "dummyDb" + expPort = 1000 + + configFileName = "dummyConfigFileName.cfg" + f = open( configFileName, "w" ) + f.write("[repet_env]\n") + f.write("repet_host: " + expHost + "\n") + f.write("repet_user: " + expUser + "\n") + f.write("repet_pw: " + expPw + "\n") + f.write("repet_db: " + expDb + "\n") + f.write("repet_port: " + str(expPort) + "\n") + f.close() + + self._iDb.setAttributesFromConfigFile(configFileName) + + obsHost = self._iDb.host + obsUser = self._iDb.user + obsPw = self._iDb.passwd + obsDb = self._iDb.dbname + obsPort = self._iDb.port + + os.remove(configFileName) + + self.assertEquals( expHost, obsHost ) + self.assertEquals( expUser, obsUser ) + self.assertEquals( expPw, obsPw ) + self.assertEquals( expDb, obsDb ) + self.assertEquals( expPort, obsPort ) + + def test_open_True(self): + self._iDb.close() + self.assertTrue(self._iDb.open()) + self.assertEquals(1, self._iDb.db.open) + self._iDb.close() + self.assertEquals(0, self._iDb.db.open) + + def test_open_False(self): + self._iDb.close() + self._iDb.user = "dummyUser" + self.assertFalse( self._iDb.open() ) + + def test_doesTableExist_True(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName ) + self._iDb.execute( sqlCmd ) + self.assertTrue( self._iDb.doesTableExist(tableName) ) + self._iDb.dropTable(tableName) + + def test_doesTableExist_False(self): + tableName = "dummyTable" + self._uniqId + self.assertFalse( self._iDb.doesTableExist(tableName) ) + + def test_dropTable(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + self.assertFalse( self._iDb.doesTableExist(tableName) ) + + def test_renameTable(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.updateInfoTable( tableName, "" ) + newTableName = "newDummyTable" + + self._iDb.renameTable(tableName, newTableName) + + self.assertFalse( self._iDb.doesTableExist(tableName) ) + self.assertTrue( self._iDb.doesTableExist(newTableName) ) + + expTuple = (('newDummyTable', ''),) + sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s"' % ( newTableName ) + self._iDb.execute( sqlCmd ) + obsTuple = self._iDb.cursor.fetchall() + self.assertEquals( expTuple, obsTuple) + + expTuple = () + sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s"' % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTuple = self._iDb.cursor.fetchall() + self.assertEquals( expTuple, obsTuple) + + self._iDb.dropTable(newTableName) + + def test_copyTable(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) );" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX idummyColumn ON %s ( dummyColumn );" % (tableName) + self._iDb.execute( sqlCmd ) + + newTableName = "newDummyTable" + + self._iDb.copyTable(tableName, newTableName) + + self.assertTrue( self._iDb.doesTableExist(tableName) ) + self.assertTrue( self._iDb.doesTableExist(newTableName) ) + + expTuple = (('newDummyTable', ''),) + sqlCmd = 'SELECT * FROM info_tables WHERE name = "%s";' % ( newTableName ) + self._iDb.execute( sqlCmd ) + obsTuple = self._iDb.cursor.fetchall() + + self.assertEquals( expTuple, obsTuple) + + expTuple = (('newDummyTable', 1L, 'idummyColumn', 1L, 'dummyColumn', 'A', None, None, None, 'YES', 'BTREE', ''),) + sqlCmd = "SHOW INDEX FROM %s;" % ( newTableName ) + self._iDb.execute( sqlCmd ) + obsTuple = self._iDb.cursor.fetchall() + self.assertEquals( expTuple, obsTuple) + + self._iDb.dropTable(tableName) + self._iDb.dropTable(newTableName) + + def test_getTableType(self): + lTypesToTest = TABLE_SCHEMA_DESCRIPTOR.keys() + for tableType in lTypesToTest: + tableName = "dummy%sTable%s" % (tableType, self._uniqId) + self._iDb.createTable(tableName, tableType) + + obsType = self._iDb.getTableType(tableName) + self.assertEquals(tableType, obsType) + + self._iDb.dropTable(tableName) + + def test_getSize_empty_table(self): + tableName = "dummyPathTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "") + pathF.close() + self._iDb.loadDataFromFile(tableName, pathFileName, False) + expSize = 0 + obsSize = self._iDb.getSize(tableName) + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expSize, obsSize ) + + def test_getSize_two_rows(self): + tableName = "dummyPathTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + self._iDb.loadDataFromFile(tableName, pathFileName, False) + expSize = 2 + obsSize = self._iDb.getSize(tableName) + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expSize, obsSize ) + + def test_isEmpty_True(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName ) + self._iDb.execute( sqlCmd ) + + fileName = "dummyTableFile.txt" + f = open( fileName, "w" ) + f.write( "" ) + f.close() + self._iDb.loadDataFromFile(tableName, fileName, False) + + self.assertTrue( self._iDb.isEmpty(tableName) ) + + self._iDb.dropTable(tableName) + os.remove(fileName) + + def test_isEmpty_False(self): + tableName = "dummyTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % tableName + self._iDb.execute( sqlCmd ) + + fileName = "dummyTableFile.txt" + f = open( fileName, "w" ) + f.write( "test" ) + f.close() + self._iDb.loadDataFromFile(tableName, fileName, False) + + self.assertFalse( self._iDb.isEmpty(tableName) ) + + self._iDb.dropTable(tableName) + os.remove(fileName) + + def test_updateInfoTable(self): + tableName = "dummyTable" + self._uniqId + info = "Table_for_test" + + self._iDb.updateInfoTable(tableName, info) + + sqlCmd = 'SELECT file FROM info_tables WHERE name = "%s"' % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + obsResult = False + if (info,) in results: + obsResult = True + sqlCmd = 'DELETE FROM info_tables WHERE name = "%s"' % ( tableName ) + self._iDb.execute( sqlCmd ) + + self.assertTrue( obsResult ) + + def test_loadDataFromFile_with_empty_file(self): + tableName = "dummyPathTable1" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "" ) + pathF.close() + expTPathTuples = () + + self._iDb.loadDataFromFile(tableName, pathFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_loadDataFromFile_with_first_line(self): + tableName = "dummyPathTable2" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expPathTuple1 = (1L, 'qry', 1L, 100L, 'sbj', 1L, 100L, 1e-123, 136L, 98.4) + expPathTuple2 = (2L, 'qry', 500L, 401L, 'sbj', 1L, 100L, 1e-152, 161L, 98.7) + expTPathTuples = (expPathTuple1, expPathTuple2) + + self._iDb.loadDataFromFile(tableName, pathFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_loadDataFromFile_without_first_line(self): + tableName = "dummyPathTable3" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expPathTuple = (2L, 'qry', 500L, 401L, 'sbj', 1L, 100L, 1e-152, 161L, 98.7) + expTPathTuples = (expPathTuple,) + + self._iDb.loadDataFromFile(tableName, pathFileName, True) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_createIndex_Map(self): + tableName = "dummyMapTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( name varchar(255), chr varchar(255), start int, end int)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["iname", "ichr", "istart", "iend", "icoord", "icoord"] + + self._iDb.createIndex(tableName, "map") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createIndex_Map_coord_index_already_exist(self): + tableName = "dummyMapTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( name varchar(255), chr varchar(255), start int, end int)" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX icoord ON %s ( start,end );" % (tableName) + self._iDb.execute( sqlCmd ) + expLIndex = ["icoord", "icoord", "iname", "ichr", "istart", "iend"] + + self._iDb.createIndex(tableName, "map") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[1:]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createTable_Map( self ): + tableName = "dummyMapTable" + self._uniqId + mapFileName = "dummyMapFile.txt" + mapF = open( mapFileName, "w" ) + mapF.write( "map1\tseq1\t20\t50\n" ) + mapF.write( "map2\tseq2\t700\t760\n" ) + mapF.close() + + expMapTuple1 = ("map1", "seq1", 20L, 50L) + expMapTuple2 = ("map2", "seq2", 700L, 760L) + expTMapTuples = (expMapTuple1, expMapTuple2) + + self._iDb.createTable(tableName, 'map', mapFileName) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTMapTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(mapFileName) + + self.assertEquals( expTMapTuples, obsTMapTuples ) + + def test_createIndex_Match(self): + tableName = "dummyMatchTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int, query_length int unsigned, query_length_perc float, match_length_perc float, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, subject_length int unsigned, subject_length_perc float, E_value double, score int unsigned, identity float, path int unsigned)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "match") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self._iDb.dropTable(tableName) + self.assertEquals( expLIndex, obsLIndex) + + def test_createIndex_Match_all_index_already_exist(self): + tableName = "dummyMatchTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int, query_length int unsigned, query_length_perc float, match_length_perc float, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, subject_length int unsigned, subject_length_perc float, E_value double, score int unsigned, identity float, path int unsigned)" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE UNIQUE INDEX id ON %s ( path );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX qname ON %s ( query_name(10) );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX qstart ON %s ( query_start );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX qend ON %s ( query_end );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX sname ON %s ( subject_name(10) );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX sstart ON %s ( subject_start );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX send ON %s ( subject_end );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX qcoord ON %s ( query_start,query_end );" % (tableName) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "match") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createTable_match( self ): + tableName = "dummyMatchTable" + self._uniqId + matchFileName = "dummyMatchFile.txt" + matchF = open( matchFileName, "w" ) + matchF.write( "qry1\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" ) + matchF.write( "qry2\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" ) + matchF.close() + + expMatchTuple = ("qry2", 700L, 760L, 60L, 100.0, 100.0, "sbj2", 500L, 560L, 60L, 100.0, 1e-123, 136L, 98.4, 2L) + expTMatchTuples = (expMatchTuple,) + + self._iDb.createTable(tableName, "match", matchFileName) + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTMatchTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(matchFileName) + + self.assertEquals( expTMatchTuples, obsTMatchTuples ) + + def test_createIndex_Path(self): + tableName = "dummyPathTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "path") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createIndex_Path_id_and_send_index_already_exist(self): + tableName = "dummyPathTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX id ON %s ( path );" % (tableName) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX send ON %s ( subject_end );" % (tableName) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "send", "qname", "qstart", "qend", "sname", "sstart", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "path") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createTable_path( self ): + tableName = "dummyPathTable" + self._uniqId + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 1L, 100L, 1e-123, 136L, 98.4) + expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7) # change coordinates + expTPathTuples = (expPathTuple1, expPathTuple2) + + self._iDb.createTable( tableName, "path", pathFileName) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_createIndex_align(self): + tableName = "dummyAlignTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int,subject_name varchar(255), subject_start int unsigned, subject_end int unsigned,E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["qname", "qstart", "qend", "sname", "sstart", "send", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "align") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createIndex_align_qstart_index_already_exist(self): + tableName = "dummyAlignTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( query_name varchar(255), query_start int, query_end int,subject_name varchar(255), subject_start int unsigned, subject_end int unsigned,E_value double, score int unsigned, identity float)" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX qstart ON %s ( query_start );" % (tableName) + self._iDb.execute( sqlCmd ) + expLIndex = ["qstart", "qname", "qend", "sname", "sstart", "send", "qcoord", "qcoord"] + + self._iDb.createIndex(tableName, "align") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createTable_align( self ): + tableName = "dummyAlignTable" + self._uniqId + alignFileName = "dummyAlignFile.txt" + alignF = open( alignFileName, "w" ) + alignF.write( "query1\t1\t100\tsubject1\t1\t150\t0.5\t15\t35\n" ) + alignF.write( "query2\t1\t100\tsubject2\t1\t150\t0.5\t15\t35\n" ) + alignF.close() + + expAlignTuple1 = ("query1", 1L, 100L, "subject1", 1L, 150L, 0.5, 15L, 35) + expAlignTuple2 = ("query2", 1L, 100L, "subject2", 1L, 150L, 0.5, 15L, 35) + expTAlignTuples = (expAlignTuple1, expAlignTuple2) + + self._iDb.createTable( tableName, "align", alignFileName ) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTAlignTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(alignFileName) + + self.assertEquals( expTAlignTuples, obsTAlignTuples ) + + def test_createIndex_set(self): + tableName = "dummySetTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, name varchar(255), chr varchar(255), start int, end int)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "iname", "ichr", "istart", "iend", "icoord", "icoord"] + + self._iDb.createIndex(tableName, "set") + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createIndex_set_id_index_already_exist(self): + tableName = "dummySetTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( path int unsigned, name varchar(255), chr varchar(255), start int, end int)" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX id ON %s ( path );" % (tableName) + self._iDb.execute( sqlCmd ) + expLIndex = ["id", "iname", "ichr", "istart", "iend", "icoord", "icoord"] + + self._iDb.createIndex(tableName, 'set') + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex[:-1]: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals( expLIndex, obsLIndex) + + def test_createTable_set( self ): + tableName = "dummySetTable" + self._uniqId + setFileName = "dummySetFile.txt" + setF = open( setFileName, "w" ) + setF.write( "15\tset1\tchr1\t1\t100\n" ) + setF.write( "15\tset2\tchr2\t1\t100\n" ) + setF.close() + + expSetTuple1 = (15L, "set1", "chr1", 1L, 100L) + expSetTuple2 = (15L, "set2", "chr2", 1L, 100L) + expTSetTuples = (expSetTuple1, expSetTuple2) + + self._iDb.createTable( tableName, 'set', setFileName ) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTSetTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(setFileName) + + self.assertEquals( expTSetTuples, obsTSetTuples ) + + def test_convertMapTableIntoSetTable( self ): + mapTableName = "dummyMapTable" + self._uniqId + mapFileName = "dummyMapFile.txt" + with open(mapFileName, "w") as mapFH: + mapFH.write("map1\tchr1\t1\t100\n") + mapFH.write("map2\tchr2\t1\t100\n") + + self._iDb.createTable(mapTableName, 'map', mapFileName) + + expSetTuple1 = (1, "map1", "chr1", 1, 100) + expSetTuple2 = (2, "map2", "chr2", 1, 100) + expTSetTuples = (expSetTuple1, expSetTuple2) + + setTableName = "dummySetTable" + self._uniqId + self._iDb.convertMapTableIntoSetTable(mapTableName, setTableName) + + sqlCmd = "SELECT * FROM %s" % setTableName + self._iDb.execute(sqlCmd) + obsTSetTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(mapTableName) + self._iDb.dropTable(setTableName) + os.remove(mapFileName) + + self.assertEquals( expTSetTuples, obsTSetTuples ) + + def test_createIndex_seq(self): + tableName = "dummySeqTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( accession varchar(255), sequence longtext, description varchar(255), length int unsigned)" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["iacc", "idescr"] + + self._iDb.createIndex(tableName,'seq') + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals(expLIndex, obsLIndex) + + def test_createIndex_seq_idescr_index_already_exist(self): + tableName = "dummySeqTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( accession varchar(255), sequence longtext, description varchar(255), length int unsigned);" % ( tableName ) + self._iDb.execute( sqlCmd ) + sqlCmd = "CREATE INDEX idescr ON %s ( description(10) );" % ( tableName ) + self._iDb.execute( sqlCmd ) + expLIndex = ["idescr", "iacc"] + + self._iDb.createIndex(tableName,'seq') + + sqlCmd = "SHOW INDEX FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + results = self._iDb.cursor.fetchall() + + for index in expLIndex: + sqlCmd = "DROP INDEX %s ON %s" % ( index, tableName ) + self._iDb.execute( sqlCmd ) + self._iDb.dropTable(tableName) + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + self.assertEquals(expLIndex, obsLIndex) + + def test_createTable_seq( self ): + tableName = "dummySeqTable" + self._uniqId + seqFileName = "dummySeqFile.txt" + seqF = open( seqFileName, "w" ) + seqF.write( ">acc1 seq1\n" ) + seqF.write( "ATACTTCGCTAGCTCGC\n" ) + seqF.write( ">acc2 seq2\n" ) + seqF.write( "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n" ) + seqF.close() + + expSeqTuple1 = ("acc1", "ATACTTCGCTAGCTCGC", "acc1 seq1", 17L) + expSeqTuple2 = ("acc2", "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC", "acc2 seq2", 68L) + expTSeqTuples = (expSeqTuple1, expSeqTuple2) + + self._iDb.createTable( tableName,'seq', seqFileName ) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTSeqTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(seqFileName) + + self.assertEquals( expTSeqTuples, obsTSeqTuples ) + + def test_createIndex_job(self): + tableName = "dummyTable%s" % self._uniqId + sqlCmd = "CREATE TABLE %s" % tableName + sqlCmd += " ( jobid INT UNSIGNED" + sqlCmd += ", jobname VARCHAR(255)" + sqlCmd += ", groupid VARCHAR(255)" + sqlCmd += ", command TEXT" + sqlCmd += ", launcher VARCHAR(1024)" + sqlCmd += ", queue VARCHAR(255)" + sqlCmd += ", status VARCHAR(255)" + sqlCmd += ", time DATETIME" + sqlCmd += ", node VARCHAR(255) )" + self._iDb.execute(sqlCmd) + expLIndex = ["ijobid", "ijobname", "igroupid", "istatus"] + + self._iDb.createIndex(tableName, 'jobs') + + sqlCmd = "SHOW INDEX FROM %s" % tableName + self._iDb.execute(sqlCmd) + results = self._iDb.cursor.fetchall() + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + for index in obsLIndex: + sqlCmd = "DROP INDEX %s ON %s" % (index, tableName) + self._iDb.execute(sqlCmd) + self._iDb.dropTable(tableName) + + self.assertEquals(expLIndex, obsLIndex) + + def test_createTable_job( self ): + tableName = "dummyTable%s" % self._uniqId + expTuples = () + + self._iDb.createTable(tableName,'jobs') + + sqlCmd = "SELECT * FROM %s" % tableName + self._iDb.execute(sqlCmd) + obsTuples = self._iDb.cursor.fetchall() + self._iDb.dropTable(tableName) + + self.assertEquals(expTuples, obsTuples) + + def test_createIndex_length(self): + tableName = "dummyTable%s" % self._uniqId + sqlCmd = "CREATE TABLE %s (accession varchar(255), length int unsigned)" % tableName + self._iDb.execute(sqlCmd) + expLIndex = ["iacc", "ilength"] + + self._iDb.createIndex(tableName,'length') + + sqlCmd = "SHOW INDEX FROM %s" % tableName + self._iDb.execute(sqlCmd) + results = self._iDb.cursor.fetchall() + + obsLIndex = [] + for tuple in results: + obsLIndex.append(tuple[2]) + + for index in obsLIndex: + sqlCmd = "DROP INDEX %s ON %s" % (index, tableName) + self._iDb.execute(sqlCmd) + self._iDb.dropTable(tableName) + + self.assertEquals(expLIndex, obsLIndex) + + def test_createTable_length( self ): + tableName = "dummyLengthTable%s" % self._uniqId + seqFileName = "dummyFile.fa" + seqF = open( seqFileName, "w" ) + seqF.write(">acc1 seq1\n") + seqF.write("ATACTTCGCTAGCTCGC\n") + seqF.write(">acc2 seq2\n") + seqF.write("ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n") + seqF.close() + + expTuple1 = ("acc1", 17) + expTuple2 = ("acc2", 68) + expTTuples = (expTuple1, expTuple2) + + self._iDb.createTable(tableName, "length", seqFileName) + + sqlCmd = "SELECT * FROM %s" % tableName + self._iDb.execute(sqlCmd) + obsTTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(seqFileName) + + self.assertEquals(expTTuples, obsTTuples) + + def test_createTable_with_overwrite_Map( self ): + tableName = "dummyMapTable" + self._uniqId + sqlCmd = "CREATE TABLE %s ( dummyColumn varchar(255) )" % ( tableName ) + self._iDb.execute( sqlCmd ) + + fileName = "dummyMapFile.txt" + mapF = open( fileName, "w" ) + mapF.write( "map1\tseq1\t20\t50\n" ) + mapF.write( "map2\tseq2\t700\t760\n" ) + mapF.close() + + expMapTuple1 = ("map1", "seq1", 20L, 50L) + expMapTuple2 = ("map2", "seq2", 700L, 760L) + expTMapTuples = (expMapTuple1, expMapTuple2) + + self._iDb.createTable(tableName, "Map", fileName, True) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTMapTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(fileName) + + self.assertEquals( expTMapTuples, obsTMapTuples ) + + def test_createTable_without_overwrite_Align( self ): + tableName = "dummyAlignTable" + self._uniqId + alignFileName = "dummyAlignFile.txt" + alignF = open( alignFileName, "w" ) + alignF.write( "query1\t1\t100\tsubject1\t1\t150\t0.5\t15\t35\n" ) + alignF.write( "query2\t1\t100\tsubject2\t1\t150\t0.5\t15\t35\n" ) + alignF.close() + + expAlignTuple1 = ("query1", 1L, 100L, "subject1", 1L, 150L, 0.5, 15L, 35) + expAlignTuple2 = ("query2", 1L, 100L, "subject2", 1L, 150L, 0.5, 15L, 35) + expTAlignTuples = (expAlignTuple1, expAlignTuple2) + + self._iDb.createTable(tableName, "align", alignFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTAlignTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(alignFileName) + + self.assertEquals( expTAlignTuples, obsTAlignTuples ) + + def test_createTable_without_overwrite_Match( self ): + tableName = "dummyMatchTable" + self._uniqId + matchFileName = "dummyMatchFile.txt" + matchF = open( matchFileName, "w" ) + matchF.write( "qry1\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" ) + matchF.write( "qry2\t700\t760\t60\t100\t100\tsbj2\t500\t560\t60\t100\t1e-123\t136\t98.4\t2\n" ) + matchF.close() + + expMatchTuple = ("qry2", 700L, 760L, 60L, 100.0, 100.0, "sbj2", 500L, 560L, 60L, 100.0, 1e-123, 136L, 98.4, 2L) + expTMatchTuples = (expMatchTuple,) + + self._iDb.createTable(tableName, "tab", matchFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTMatchTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(matchFileName) + + self.assertEquals( expTMatchTuples, obsTMatchTuples ) + + def test_createTable_without_overwrite_Path( self ): + tableName = "dummyPathTable" + self._uniqId + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 1L, 100L, 1e-123, 136L, 98.4) + expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7) # change coordinates + expTPathTuples = (expPathTuple1, expPathTuple2) + + self._iDb.createTable(tableName, "Path", pathFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_createTable_without_overwrite_Set( self ): + tableName = "dummySetTable" + self._uniqId + setFileName = "dummySetFile.txt" + setF = open( setFileName, "w" ) + setF.write( "15\tset1\tchr1\t1\t100\n" ) + setF.write( "15\tset2\tchr2\t1\t100\n" ) + setF.close() + + expSetTuple1 = (15L, "set1", "chr1", 1L, 100L) + expSetTuple2 = (15L, "set2", "chr2", 1L, 100L) + expTSetTuples = (expSetTuple1, expSetTuple2) + + self._iDb.createTable(tableName, "Set", setFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTSetTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(setFileName) + + self.assertEquals( expTSetTuples, obsTSetTuples ) + + def test_createTable_without_overwrite_Seq( self ): + tableName = "dummySeqTable" + self._uniqId + seqFileName = "dummySeqFile.txt" + seqF = open( seqFileName, "w" ) + seqF.write( ">acc1 seq1\n" ) + seqF.write( "ATACTTCGCTAGCTCGC\n" ) + seqF.write( ">acc2 seq2\n" ) + seqF.write( "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC\n" ) + seqF.close() + + expSeqTuple1 = ("acc1", "ATACTTCGCTAGCTCGC", "acc1 seq1", 17L) + expSeqTuple2 = ("acc2", "ATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGCATACTTCGCTAGCTCGC", "acc2 seq2", 68L) + expTSeqTuples = (expSeqTuple1, expSeqTuple2) + + self._iDb.createTable(tableName, "fasta", seqFileName, False) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTSeqTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(seqFileName) + + self.assertEquals( expTSeqTuples, obsTSeqTuples ) + + def test_createTable_with_overwrite_Classif( self ): + tableName = "dummyClassifTable" + self._uniqId + classifFileName = "dummyClassifFile.txt" + with open( classifFileName, "w" ) as f: + f.write("RIX-incomp-chim_DmelCaf1_2_0-B-G1000-Map3\t3508\t-\tPotentialChimeric\tI\tLINE\tincomplete\tCI=36; coding=(TE_BLRtx: DMCR1A:ClassI:LINE:Jockey: 14.16%); struct=(TElength: >700bps)\n") + f.write("RLX-incomp_DmelCaf1_2_0-B-G1019-Map3\t4131\t+\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: ROO_I:ClassI:LTR:Bel-Pao: 43.27%, ROO_LTR:ClassI:LTR:Bel-Pao: 100.00%; TE_BLRx: BEL-6_DWil-I_2p:ClassI:LTR:Bel-Pao: 69.84%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0087866_Dmel_r4.3: 4.72%; SSRCoverage=0.15<0.75)\n") + + self._iDb.createTable(tableName, "Classif", classifFileName, True) + + self.assertTrue(self._iDb.getSize(tableName) == 2) + self._iDb.dropTable(tableName) + os.remove(classifFileName) + + def test_createTable_no_file( self ): + lTypesToTest = TABLE_SCHEMA_DESCRIPTOR.keys() + lTypesToTest.extend(TABLE_TYPE_SYNONYMS) + for tableType in lTypesToTest: + tableName = "dummy%sTable%s" % (tableType, self._uniqId) + self._iDb.createTable(tableName, tableType) + + self.assertTrue(self._iDb.doesTableExist(tableName)) + self.assertTrue(self._iDb.isEmpty(tableName)) + + self._iDb.dropTable(tableName) + + def test_changePathQueryCoordinatesToDirectStrand(self): + tableName = "dummyPathTable" + self._uniqId + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expPathTuple1 = (1L, "qry", 1L, 100L, "sbj", 100L, 1L, 1e-123, 136L, 98.4) + expPathTuple2 = (2L, "qry", 401L, 500L, "sbj", 100L, 1L, 1e-152, 161L, 98.7) + expPathTuple3 = (3L, "qry", 5L, 401L, "sbj", 1L, 100L, 1e-152, 161L, 98.7) + expTPathTuples = (expPathTuple1, expPathTuple2, expPathTuple3) + + sqlCmd = "CREATE TABLE %s ( path int unsigned, query_name varchar(255), query_start int , query_end int, subject_name varchar(255), subject_start int unsigned, subject_end int unsigned, E_value double, score int unsigned, identity float)" % tableName + self._iDb.execute( sqlCmd ) + + self._iDb.loadDataFromFile(tableName, pathFileName, False) + self._iDb.changePathQueryCoordinatesToDirectStrand(tableName) + + sqlCmd = "SELECT * FROM %s" % ( tableName ) + self._iDb.execute( sqlCmd ) + obsTPathTuples = self._iDb.cursor.fetchall() + + self._iDb.dropTable(tableName) + os.remove(pathFileName) + + self.assertEquals( expTPathTuples, obsTPathTuples ) + + def test_exportDataToFile(self): + tableName = "dummyPathTable" + self._uniqId + expFileName = "dummyPathFile.txt" + pathF = open( expFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + self._iDb.createTable(tableName, "Path", expFileName, False) + obsFileName = "DummyObsFileName" + + self._iDb.exportDataToFile(tableName, obsFileName) + + self.assertTrue(FileUtils.isRessourceExists(obsFileName)) + self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) + + self._iDb.dropTable(tableName) + os.remove(expFileName) + os.remove(obsFileName) + + def test_exportDataToFile_keepFirstLineTrue(self): + tableName = "dummyPathTable" + self._uniqId + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expFileName = "expPathFile.txt" + pathF = open( expFileName, "w" ) + pathF.write("path\tquery_name\tquery_start\tquery_end\tsubject_name\tsubject_start\tsubject_end\tE_value\tscore\tidentity\n") + pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + self._iDb.createTable(tableName, "Path", pathFileName, False) + obsFileName = "DummyObsFileName" + + self._iDb.exportDataToFile(tableName, obsFileName, True) + + self.assertTrue(FileUtils.isRessourceExists(obsFileName)) + self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) + + self._iDb.dropTable(tableName) + os.remove(expFileName) + os.remove(obsFileName) + os.remove(pathFileName) + + def test_exportDataToFile_with_keepFirstLineTrue_and_param(self): + tableName = "dummyPathTable" + self._uniqId + pathFileName = "dummyPathFile.txt" + pathF = open( pathFileName, "w" ) + pathF.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + pathF.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + pathF.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + pathF.close() + + expFileName = "expPathFile.txt" + pathF = open( expFileName, "w" ) + pathF.write("path\tquery_name\tquery_start\tquery_end\tsubject_name\tsubject_start\tsubject_end\tE_value\tscore\tidentity\n") + pathF.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + pathF.close() + + self._iDb.createTable(tableName, "Path", pathFileName, False) + obsFileName = "DummyObsFileName" + + self._iDb.exportDataToFile(tableName, obsFileName, True, "where query_name = 'qry2'") + + self.assertTrue(FileUtils.isRessourceExists(obsFileName)) + self.assertTrue(FileUtils.are2FilesIdentical(expFileName, obsFileName)) + + self._iDb.dropTable(tableName) + os.remove(expFileName) + os.remove(obsFileName) + os.remove(pathFileName) + + + def test_convertPathTableIntoAlignTable( self ): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + expAlignFile = "dummyExpAlignFile_%s" % ( self._uniqId ) + expAlignFileHandler = open( expAlignFile, "w" ) + expAlignFileHandler.write( "qry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + expAlignFileHandler.write( "qry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + expAlignFileHandler.write( "qry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + expAlignFileHandler.close() + obsAlignTable = "dummyObsAlignTable_%s" % ( self._uniqId ) + + self._iDb.convertPathTableIntoAlignTable( inPathTable, obsAlignTable ) + + obsAlignFile = "dummyObsAlignFile_%s" % ( self._uniqId ) + self._iDb.exportDataToFile( obsAlignTable, obsAlignFile, False ) + self.assertTrue( FileUtils.are2FilesIdentical( expAlignFile, obsAlignFile ) ) + + for f in [ inPathFile, expAlignFile, obsAlignFile ]: + os.remove( f ) + for t in [ inPathTable, obsAlignTable ]: + self._iDb.dropTable( t ) + + def test_convertAlignTableIntoPathTable( self ): + inAlignTable = "dummyInPathTable_%s" % ( self._uniqId ) + inAlignFile = "dummyInPathFile_%s" % ( self._uniqId ) + inAlignFileHandler = open( inAlignFile, "w" ) + inAlignFileHandler.write( "qry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + inAlignFileHandler.write( "qry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + inAlignFileHandler.write( "qry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inAlignFileHandler.close() + self._iDb.createTable( inAlignTable, "align", inAlignFile, True ) + + expPathFile = "dummyExpPathFile_%s" % ( self._uniqId ) + expPathFileHandler = open( expPathFile, "w" ) + expPathFileHandler.write( "1\tqry\t1\t100\tsbj\t100\t1\t1e-123\t136\t98.4\n" ) + expPathFileHandler.write( "2\tqry2\t401\t500\tsbj\t100\t1\t1e-152\t161\t98.7\n" ) + expPathFileHandler.write( "3\tqry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + expPathFileHandler.close() + obsPathTable = "dummyObsPathTable_%s" % ( self._uniqId ) + + self._iDb.convertAlignTableIntoPathTable( inAlignTable, obsPathTable ) + + obsPathFile = "dummyObsAlignFile_%s" % ( self._uniqId ) + self._iDb.exportDataToFile( obsPathTable, obsPathFile, False ) + self.assertTrue( FileUtils.are2FilesIdentical( expPathFile, obsPathFile ) ) + + for f in [ inAlignFile, expPathFile, obsPathFile ]: + os.remove( f ) + for t in [ inAlignTable, obsPathTable ]: + self._iDb.dropTable( t ) + + def test_convertAlignTableIntoPathTable_with_single_quote( self ): + inAlignTable = "dummyInPathTable_%s" % ( self._uniqId ) + inAlignFile = "dummyInPathFile_%s" % ( self._uniqId ) + inAlignFileHandler = open( inAlignFile, "w" ) + inAlignFileHandler.write( "qry\t1\t100\t'sbj\t100\t1\t1e-123\t136\t98.4\n" ) + inAlignFileHandler.write( "qry2\t401\t500\tsbj'\t100\t1\t1e-152\t161\t98.7\n" ) + inAlignFileHandler.write( "qry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inAlignFileHandler.close() + self._iDb.createTable( inAlignTable, "align", inAlignFile, True ) + + expPathFile = "dummyExpPathFile_%s" % ( self._uniqId ) + expPathFileHandler = open( expPathFile, "w" ) + expPathFileHandler.write( "1\tqry\t1\t100\t'sbj\t100\t1\t1e-123\t136\t98.4\n" ) + expPathFileHandler.write( "2\tqry2\t401\t500\tsbj'\t100\t1\t1e-152\t161\t98.7\n" ) + expPathFileHandler.write( "3\tqry3\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + expPathFileHandler.close() + obsPathTable = "dummyObsPathTable_%s" % ( self._uniqId ) + + self._iDb.convertAlignTableIntoPathTable( inAlignTable, obsPathTable ) + + obsPathFile = "dummyObsAlignFile_%s" % ( self._uniqId ) + self._iDb.exportDataToFile( obsPathTable, obsPathFile, False ) + self.assertTrue( FileUtils.are2FilesIdentical( expPathFile, obsPathFile ) ) + + for f in [ inAlignFile, expPathFile, obsPathFile ]: + os.remove( f ) + for t in [ inAlignTable, obsPathTable ]: + self._iDb.dropTable( t ) + + def test_getObjectListWithSQLCmd(self): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + path1 = Path() + path1.setFromTuple((1, "qry", 1, 100, "sbj", 100, 1, 1e-123, 136, 98.4)) + path2 = Path() + path2.setFromTuple((2, "qry", 401, 500, "sbj", 100, 1, 1e-152, 161, 98.7)) + path3 = Path() + path3.setFromTuple((3, "qry", 5, 401, "sbj", 1, 100, 1e-152, 161, 98.7)) + expLPath = [path1, path2, path3] + sqlCmd = "SELECT * FROM %s;" % (inPathTable) + obsLPath = self._iDb.getObjectListWithSQLCmd(sqlCmd, self._getInstanceToAdapt) + + os.remove( inPathFile ) + self._iDb.dropTable( inPathTable ) + + self.assertEquals(expLPath, obsLPath) + + def test_getIntegerListWithSQLCmd(self): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + expLPath = [1, 2, 3] + sqlCmd = "SELECT * FROM %s;" % (inPathTable) + obsLPath = self._iDb.getIntegerListWithSQLCmd(sqlCmd) + + os.remove( inPathFile ) + self._iDb.dropTable( inPathTable ) + + self.assertEquals(expLPath, obsLPath) + + def test_getIntegerWithSQLCmd(self): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + expId = 1 + sqlCmd = "SELECT path FROM %s where path='%d';" % (inPathTable, 1) + obsId = self._iDb.getIntegerWithSQLCmd(sqlCmd) + + os.remove( inPathFile ) + self._iDb.dropTable( inPathTable ) + + self.assertEquals(expId, obsId) + + def test_getStringListWithSQLCmd(self): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t100\t1\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "3\tqry\t5\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + expLString = ["qry","qry","qry"] + sqlCmd = "SELECT query_name FROM %s;" % (inPathTable) + obsLString = self._iDb.getStringListWithSQLCmd(sqlCmd) + + os.remove( inPathFile ) + self._iDb.dropTable( inPathTable ) + + self.assertEquals(expLString, obsLString) + + def test_removeDoublons( self ): + inPathTable = "dummyInPathTable_%s" % ( self._uniqId ) + inPathFile = "dummyInPathFile_%s" % ( self._uniqId ) + inPathFileHandler = open( inPathFile, "w" ) + inPathFileHandler.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + inPathFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + inPathFileHandler.close() + self._iDb.createTable( inPathTable, "path", inPathFile, True ) + + expFile = "dummyExpFile_%s" % ( self._uniqId ) + expFileHandler = open( expFile, "w" ) + expFileHandler.write( "1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n" ) + expFileHandler.write( "2\tqry\t401\t500\tsbj\t1\t100\t1e-152\t161\t98.7\n" ) + expFileHandler.close() + + self._iDb.removeDoublons( inPathTable ) + + obsFile = "dummyObsFile_%s" % ( self._uniqId ) + self._iDb.exportDataToFile(inPathTable, obsFile) + + self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) ) + + self._iDb.dropTable( inPathTable ) + for f in [ inPathFile, expFile, obsFile ]: + os.remove( f ) + + def test_getTableListFromPattern_oneTable( self ): + inTable = "dummyInTable_%s" % ( self._uniqId ) + self._iDb.createTable( inTable, "path", "", True ) + exp = [ inTable ] + obs = self._iDb.getTableListFromPattern( "%s%%" % inTable ) + self.assertEqual( exp, obs ) + self._iDb.dropTable( inTable ) + + def test_getTableListFromPattern_twoTables( self ): + inTable1 = "dummyInTable1_%s" % ( self._uniqId ) + inTable2 = "dummyInTable2_%s" % ( self._uniqId ) + inTable3 = "dummyTotoTable3_%s" % ( self._uniqId ) + for table in [ inTable1, inTable2, inTable3 ]: + self._iDb.createTable( table, "path", "", True ) + exp = [ inTable1, inTable2 ] + obs = self._iDb.getTableListFromPattern( "dummyInTable%%_%s" % self._uniqId ) + self.assertEqual( exp, obs ) + for table in [ inTable1, inTable2, inTable3 ]: + self._iDb.dropTable( table ) + + def test_createPathStatTable(self): + statsFileName = "DmelCaf1_statsPerClassif.txt" + f = open (statsFileName, "w") + f.write("family\tmaxLength\tmeanLength\tcovg\tfrags\tfullLgthFrags\tcopies\tfullLgthCopies\tmeanId\tsdId\tminId\tq25Id\tmedId\tq75Id\tmaxId\tmeanLgth\tsdLgth\tminLgth\tq25Lgth\tmedLgth\tq75Lgth\tmaxLgth\tmeanLgthPerc\tsdLgthPerc\tminLgthPerc\tq25LgthPerc\tmedLgthPerc\tq75LgthPerc\tmaxLgthPerc\n") + f.write("Helitron\t2367\t2367\t138367\t852\t0\t803\t0\t81.20\t4.24\t68.55\t78.32\t81.03\t83.49\t100.00\t172.46\t184.92\t21\t70.00\t129.00\t216.00\t2202\t7.29\t7.81\t0.89\t2.96\t5.45\t9.13\t93.03\n") + f.write("LINE\t7688\t7688\t3769377\t8358\t10\t6329\t10\t85.52\t8.02\t62.80\t79.27\t83.33\t92.88\t100.00\t597.97\t980.29\t21\t117.00\t256.00\t537.00\t7726\t7.78\t12.75\t0.27\t1.52\t3.33\t6.98\t100.49\n") + f.write("LTR\t13754\t13754\t9146587\t20749\t0\t17868\t1\t82.69\t7.39\t58.76\t77.81\t80.82\t85.67\t100.00\t519.75\t1217.12\t20\t105.00\t183.50\t336.00\t13738\t3.78\t8.85\t0.15\t0.76\t1.33\t2.44\t99.88\n") + f.write("MITE\t378\t378\t2890\t10\t3\t9\t3\t98.78\t1.20\t95.80\t98.64\t99.18\t99.46\t99.73\t325.33\t47.86\t253\t290.00\t333.00\t362.00\t390\t86.07\t12.66\t66.93\t76.72\t88.10\t95.77\t103.17\n") + f.write("NoCat\t9999\t9999\t384076\t1297\t1\t1219\t1\t82.60\t6.73\t61.20\t78.37\t81.41\t85.29\t100.00\t323.01\t686.85\t21\t64.00\t139.00\t280.00\t10000\t3.23\t6.87\t0.21\t0.64\t1.39\t2.80\t100.01\n") + f.write("SSR\t680\t680\t325152\t2340\t24\t2290\t28\t79.07\t3.60\t69.19\t76.64\t79.02\t81.10\t97.83\t221.64\t139.84\t21\t121.00\t183.00\t285.00\t799\t32.59\t20.57\t3.09\t17.79\t26.91\t41.91\t117.50\n") + f.write("TIR\t2532\t2532\t700173\t2503\t5\t2160\t5\t84.70\t7.43\t64.03\t79.46\t82.77\t90.09\t100.00\t326.54\t405.94\t21\t90.00\t187.00\t342.00\t2758\t12.90\t16.03\t0.83\t3.55\t7.39\t13.51\t108.93\n") + f.write("confused\t19419\t19419\t1299224\t3903\t0\t3311\t0\t82.30\t6.34\t63.20\t78.17\t80.81\t84.58\t100.00\t408.22\t989.57\t21\t113.00\t207.00\t339.00\t17966\t2.10\t5.10\t0.11\t0.58\t1.07\t1.75\t92.52\n") + f.close() + tableName = "dummyDmelCaf1_chr_allTEs_nr_noSSR_join_path_statsPerClassif" + self._iDb.createTable(tableName, "pathstat", statsFileName) + + self.assertTrue(self._iDb.doesTableExist(tableName)) + + expSize = 8 + obsSize = self._iDb.getSize(tableName) + self.assertEquals(expSize, obsSize) + + expColumnNb = 29 + sqlCmd = "DESC %s;" % tableName + self._iDb.execute(sqlCmd) + res = self._iDb.fetchall() + obsColumnNb = len(res) + self.assertEquals(expColumnNb, obsColumnNb) + + self._iDb.dropTable(tableName) + os.remove(statsFileName) + + def test_createJobTable_is_table_created(self): + tableName = "dummyJobTable" + self._uniqId + self._iDb.createTable(tableName, "jobs") + self.assertTrue(self._iDb.doesTableExist(tableName)) + self._iDb.dropTable(tableName) + + def test_createClassifTable(self): + tableName = "dummyClassifTable" + self._iDb.dropTable(tableName) + fileName = "test.classif" + + with open(fileName, "w") as f: + f.write("RIX-incomp-chim_DmelCaf1_2_0-B-G1000-Map3\t3508\t-\tPotentialChimeric\tI\tLINE\tincomplete\tCI=36; coding=(TE_BLRtx: DMCR1A:ClassI:LINE:Jockey: 14.16%, FW3_DM:ClassI:LINE:Jockey: 15.07%; TE_BLRx: CR1-1_DWil_2p:ClassI:LINE:Jockey: 18.98%, FW2_DM-ORF1p:ClassI:LINE:Jockey: 22.36%, Jockey-1_DYa_1p:ClassI:LINE:Jockey: 11.86%); struct=(TElength: >700bps); other=(TE_BLRx: Gypsy7-I_Dmoj_1p:ClassI:LTR:Gypsy: 12.58%; HG_BLRn: FBtr0089196_Dmel_r4.3: 11.74%; SSRCoverage=0.12<0.75)\n") + f.write("RLX-incomp_DmelCaf1_2_0-B-G1019-Map3\t4131\t+\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: ROO_I:ClassI:LTR:Bel-Pao: 43.27%, ROO_LTR:ClassI:LTR:Bel-Pao: 100.00%; TE_BLRx: BEL-6_DWil-I_2p:ClassI:LTR:Bel-Pao: 69.84%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0087866_Dmel_r4.3: 4.72%; SSRCoverage=0.15<0.75)\n") + f.write("RLX-incomp_DmelCaf1_2_0-B-G1025-Map3\t6534\t-\tok\tI\tLTR\tincomplete\tCI=28; coding=(TE_BLRtx: Gypsy2-I_Dmoj:ClassI:LTR:Gypsy: 11.82%, MDG3_DM:ClassI:LTR:Gypsy: 17.43%, STALKER2_LTR:ClassI:LTR:Gypsy: 14.62%, STALKER4_LTR:ClassI:LTR:Gypsy: 57.21%; TE_BLRx: Gypsy-16_DWil-I_1p:ClassI:LTR:Gypsy: 32.19%; profiles: PF00665.18_rve_INT_32.0: 68.64%); struct=(TElength: >4000bps); other=(HG_BLRn: FBtr0070036_Dmel_r4.3: 3.73%; TermRepeats: non-termLTR: 1701; SSRCoverage=0.14<0.75)\n") + + self._iDb.createTable(tableName, "classif", fileName) + self.assertTrue(self._iDb.doesTableExist(tableName)) + + expColumnNb = 8 + sqlCmd = "DESC %s;" % tableName + self._iDb.execute(sqlCmd) + res = self._iDb.fetchall() + obsColumnNb = len(res) + self.assertEquals(expColumnNb, obsColumnNb) + + expSize = 3 + obsSize = self._iDb.getSize(tableName) + self.assertEquals(expSize, obsSize) + + expLIndex = ["iseq_name", "istatus", "iclass", "iorder", "icomp"] + sqlCmd = "SHOW INDEX FROM %s" % tableName + self._iDb.execute(sqlCmd) + res = self._iDb.cursor.fetchall() + obsLIndex = [] + for tuple in res: + obsLIndex.append(tuple[2]) + self.assertEquals(expLIndex, obsLIndex) + + self._iDb.dropTable(tableName) + os.remove(fileName) + + def test_createClassifIndex(self): + tableName = "dummyclassifTable%s" % self._uniqId + sqlCmd = "CREATE TABLE %s (seq_name varchar(255), length int unsigned, strand char, status varchar(255), class_classif varchar(255), order_classif varchar(255), completeness varchar(255), evidences text);" % tableName + self._iDb.execute(sqlCmd) + expLIndex = ["iseq_name", "istatus", "iclass", "iorder", "icomp"] + + self._iDb.createIndex(tableName, "classif") + + sqlCmd = "SHOW INDEX FROM %s" % tableName + self._iDb.execute(sqlCmd) + res = self._iDb.cursor.fetchall() + + obsLIndex = [] + for tuple in res: + obsLIndex.append(tuple[2]) + self.assertEquals(expLIndex, obsLIndex) + self._iDb.dropTable(tableName) + + def test_createBinPathTable(self): + pathFileName = "dummy.path" + with open(pathFileName, "w") as pathF: + pathF.write("1\tqry\t1\t100\tsbj\t1\t100\t1e-123\t136\t98.4\n") + pathF.write("2\tqry\t500\t401\tsbj\t1\t100\t1e-152\t161\t98.7\n") + + expPathTuple1 = (1, 1000000, "qry", 1, 100, 1) + expPathTuple2 = (2, 1000000, "qry", 401, 500, 1) # change coordinates + expTPathTuples = (expPathTuple1, expPathTuple2) + + pathTableName = "dummy_path" + idxTableName = "dummy_path_idx" + self._iDb.createTable(pathTableName, "path", pathFileName) + self._iDb.createBinPathTable(pathTableName, True) + + sqlCmd = "SELECT * FROM %s" % idxTableName + self._iDb.execute(sqlCmd) + obsTPathTuples = self._iDb.fetchall() + + self._iDb.dropTable(pathTableName) + self._iDb.dropTable(idxTableName) + os.remove(pathFileName) + + self.assertEquals(expTPathTuples, obsTPathTuples) + + def test_createBinSetTable(self): + setFileName = "dummy.set" + with open(setFileName, "w") as setF: + setF.write("1\tseq1\tchr1\t1900\t3900\n") + setF.write("2\tseq2\tchr1\t2\t9\n") + setF.write("3\tseq3\tchr1\t8\t13\n") + + expTuple = ((1L, 10000.0, 'chr1', 1900L, 3900L, 1L), (2L, 1000.0, 'chr1', 2L, 9L, 1L), (3L, 1000.0, 'chr1', 8L, 13L, 1L)) + + setTableName = "dummy_set" + idxTableName = "dummy_set_idx" + self._iDb.createTable(setTableName, "set", setFileName) + self._iDb.createBinSetTable(setTableName, True) + + sqlCmd = "SELECT * FROM %s" % idxTableName + self._iDb.execute(sqlCmd) + obsTuple = self._iDb.fetchall() + + self._iDb.dropTable(setTableName) + self._iDb.dropTable(idxTableName) + os.remove(setFileName) + + self.assertEquals(expTuple, obsTuple) + + def _getInstanceToAdapt(self): + iPath = Path() + return iPath + +if __name__ == "__main__": + unittest.main() \ No newline at end of file