Mercurial > repos > yufei-luo > s_mart
diff smart_toolShed/commons/core/coord/test/Test_Align.py @ 0:e0f8dcca02ed
Uploaded S-MART tool. A toolbox manages RNA-Seq and ChIP-Seq data.
author | yufei-luo |
---|---|
date | Thu, 17 Jan 2013 10:52:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/smart_toolShed/commons/core/coord/test/Test_Align.py Thu Jan 17 10:52:14 2013 -0500 @@ -0,0 +1,518 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import unittest +import os +import time +from commons.core.coord.Align import Align +from commons.core.coord.Map import Map +from commons.core.utils.FileUtils import FileUtils +from commons.core.coord.Range import Range + + +class Test_Align( unittest.TestCase ): + + def setUp(self): + self._align = Align() + self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S"), os.getpid() ) + + def tearDown(self): + self._align = None + + def test_isEmpty_True(self): + alignInstance = Align() + + self.assertTrue(alignInstance.isEmpty()) + + def test_isEmpty_True_query_is_empty(self): + alignInstance = Align() + line = "\t-1\t-1\tTE2\t3\t10\t1e-20\t30\t90.2\n" + alignInstance.setFromString(line) + + self.assertTrue(alignInstance.isEmpty()) + + def test_isEmpty_True_subject_is_empty(self): + alignInstance = Align() + line = "chr1\t2\t20\t\t-1\t-1\t1e-20\t30\t90.2\n" + alignInstance.setFromString(line) + + self.assertTrue(alignInstance.isEmpty()) + + def test_isEmpty_False(self): + alignInstance = Align() + line = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n" + alignInstance.setFromString(line) + + self.assertFalse(alignInstance.isEmpty()) + + def test_read(self): + line = "chr2\t1\t10\tTE3\t11\t17\t1e-20\t30\t90.2\n" + expReturn = 1 + + dummyMockAlignFile = "dummyMockAlignFile" + mockAlignFileHandle = open(dummyMockAlignFile, "w") + mockAlignFileHandle.write(line) + mockAlignFileHandle.close() + + expAlignInstance = Align() + expAlignInstance.setFromString(line) + + mockAlignFileHandle = open(dummyMockAlignFile, "r") + obsAlignInstance = Align() + obsReturn = obsAlignInstance.read(mockAlignFileHandle) + + mockAlignFileHandle.close() + os.remove(dummyMockAlignFile) + + self.assertEquals(expAlignInstance, obsAlignInstance) + self.assertEquals(expReturn, obsReturn) + + def test_read_empty_file(self): + expReturn = 0 + + dummyMockAlignFile = "dummyMockAlignFile" + mockAlignFileHandle = open(dummyMockAlignFile, "w") + mockAlignFileHandle.close() + + mockAlignFileHandle = open(dummyMockAlignFile, "r") + obsAlignInstance = Align() + obsReturn = obsAlignInstance.read(mockAlignFileHandle) + os.remove(dummyMockAlignFile) + + self.assertEquals(expReturn, obsReturn) + + def test_write (self): + expAlignFile = "expMockAlignFile" + expAlignFileHandle = open(expAlignFile, "w") + expLine = "chr1\t1\t10\tTE2\t3\t10\t0\t30\t90.200000\n" + expAlignFileHandle.write(expLine) + expAlignFileHandle.close() + + obsAlignFile = "obsAlignFile" + obsAlignFileHandle = open(obsAlignFile, "w") + obsAlignInstance = Align() + obsAlignTuple = ("chr1", 1, 10, "TE2", 3, 10, 0.0, 30, 90.2) + obsAlignInstance.setFromTuple(obsAlignTuple) + obsAlignInstance.write(obsAlignFileHandle) + obsAlignFileHandle.close() + + self.assertTrue( FileUtils.are2FilesIdentical( expAlignFile, obsAlignFile ) ) + os.remove(expAlignFile) + os.remove(obsAlignFile) + + def test_merge (self): + alignInstanceChr1 = Align() + alignInstanceChr2 = Align() + + line1 = "chr1\t1\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n" + line2 = "chr2\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2\n" + + alignInstanceChr1.setFromString(line1) + alignInstanceChr2.setFromString(line2) + + expResult = None + obsResult = alignInstanceChr1.merge(alignInstanceChr2) + + self.assertEquals(expResult, obsResult) + + line1 = "chr1\t1\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n" + line2 = "chr1\t1\t10\tTE3\t11\t17\t1e-20\t30\t90.2\n" + + alignInstanceTE2 = Align() + alignInstanceTE3 = Align() + + alignInstanceTE2.setFromString(line1) + alignInstanceTE3.setFromString(line2) + + expResult = None + obsResult = alignInstanceTE2.merge(alignInstanceTE3) + + self.assertEquals(expResult, obsResult) + + def test_merge_plus_strand1 (self): + alignInstance1 = Align() + alignInstance2 = Align() + + line1 = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n" + line2 = "chr1\t1\t10\tTE2\t1\t9\t1e-20\t30\t90.2\n" + + alignInstance1.setFromString(line1) + alignInstance2.setFromString(line2) + + expLine = "chr1\t1\t20\tTE2\t1\t10\t1e-20\t30\t90.2\n" + expAlign = Align() + expAlign.setFromString(expLine) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_plus_strand2 (self ): + alignInstance1 = Align() + alignInstance2 = Align() + + line1 = "chr1\t2\t20\tTE2\t3\t10\t1e-20\t30\t90.2\n" + line2 = "chr1\t4\t30\tTE2\t4\t12\t1e-20\t30\t90.2\n" + + alignInstance1.setFromString(line1) + alignInstance2.setFromString(line2) + + expLine = "chr1\t2\t30\tTE2\t3\t12\t1e-20\t30\t90.2\n" + expAlign = Align() + expAlign.setFromString(expLine) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_plus_strand3 (self ): + alignInstance1 = Align() + alignInstance2 = Align() + + line1 = "chr1\t2\t10\tTE2\t3\t10\t1e-20\t30\t90.2\n" + line2 = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n" + + alignInstance1.setFromString(line1) + alignInstance2.setFromString(line2) + + expLine = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n" + expAlign = Align() + expAlign.setFromString(expLine) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_plus_strand4 (self ): + alignInstance1 = Align() + alignInstance2 = Align() + + line1 = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n" + line2 = "chr1\t2\t10\tTE2\t2\t10\t1e-20\t30\t90.2\n" + + alignInstance1.setFromString(line1) + alignInstance2.setFromString(line2) + + expLine = "chr1\t1\t20\tTE2\t1\t12\t1e-20\t30\t90.2\n" + expAlign = Align() + expAlign.setFromString(expLine) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge__neg_strand1 (self): + + rangeQuery1 = Range("chr1", 20, 2); rangeSubject1 = Range("TE2", 10, 3) + rangeQuery2 = Range("chr1", 1, 10); rangeSubject2 = Range("TE2", 1, 9) + + alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2) + alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2) + + expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 10, 1) + expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge__neg_strand2 (self): + rangeQuery1 = Range("chr1", 20, 2); rangeSubject1 = Range("TE2", 10, 3) + rangeQuery2 = Range("chr1", 4, 30); rangeSubject2 = Range("TE2", 4, 12) + + alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2) + alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2) + + expRangeQuery = Range("chr1", 30, 2); expRangeSubject = Range("TE2", 12, 3) + expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_neg_strand3 (self ): + rangeQuery1 = Range("chr1", 10, 2); rangeSubject1 = Range("TE2", 10, 3) + rangeQuery2 = Range("chr1", 1, 20); rangeSubject2 = Range("TE2", 1, 12) + + alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2) + alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2) + + expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1) + expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_neg_strand4 (self ): + rangeQuery1 = Range("chr1", 20, 1); rangeSubject1 = Range("TE2", 12, 1) + rangeQuery2 = Range("chr1", 2, 10); rangeSubject2 = Range("TE2", 2, 10) + + alignInstance1 = Align(rangeQuery1, rangeSubject1, 0, 30, 90.2) + alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.2) + + expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1) + expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.2) + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + + def test_merge_id_score_identity_eValue (self): + rangeQuery1 = Range("chr1", 20, 1); rangeSubject1 = Range("TE2", 12, 1) + rangeQuery2 = Range("chr1", 2, 10); rangeSubject2 = Range("TE2", 2, 10) + + alignInstance1 = Align(rangeQuery1, rangeSubject1, 0.05, 20, 90.2) + alignInstance1.id = 1 + alignInstance2 = Align(rangeQuery2, rangeSubject2, 0, 30, 90.3) + alignInstance2.id = 2 + + expRangeQuery = Range("chr1", 20, 1); expRangeSubject = Range("TE2", 12, 1) + expAlign = Align(expRangeQuery, expRangeSubject, 0, 30, 90.3) + expAlign.id = 1 + + alignInstance1.merge(alignInstance2) + obsAlign = alignInstance1 + + self.assertEquals(expAlign, obsAlign) + self.assertEquals(expAlign.id, obsAlign.id) + + def test_setFromTuple_QryRev(self): + self._align.setFromTuple( ( "qry1", 100, 1, "sbj1", 201, 300, 0.0, 135, 97.2 ) ) + self.assertEqual( self._align.range_query.seqname, "qry1" ) + self.assertEqual( self._align.range_query.start, 1 ) + self.assertEqual( self._align.range_query.end, 100 ) + self.assertEqual( self._align.range_subject.seqname, "sbj1" ) + self.assertEqual( self._align.range_subject.start, 300 ) + self.assertEqual( self._align.range_subject.end, 201 ) + self.assertEqual( self._align.e_value, 0.0 ) + self.assertEqual( self._align.score, 135 ) + self.assertEquals( self._align.identity, 97.2 ) + + def test_setFromTuple_identityAsFloat(self): + self._align.setFromTuple( ( "qry1", "301", "600", "sbj1", "1", "300", "0.0", "135", 0.0) ) + self.assertEqual( self._align.range_query.seqname, "qry1" ) + self.assertEqual( self._align.range_query.start, 301 ) + self.assertEqual( self._align.range_query.end, 600 ) + self.assertEqual( self._align.range_subject.seqname, "sbj1" ) + self.assertEqual( self._align.range_subject.start, 1 ) + self.assertEqual( self._align.range_subject.end, 300 ) + self.assertEqual( self._align.e_value, float("0.0") ) + self.assertEqual( self._align.score, float("135") ) + self.assertEquals( self._align.identity, 0.0 ) + + def test_setFromString(self): + line = "chr1\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2\n" + self._align.setFromString( line ) + self.assertEqual( self._align.range_query.seqname, "chr1" ) + self.assertEqual( self._align.range_query.start, 1 ) + self.assertEqual( self._align.range_query.end, 10 ) + self.assertEqual( self._align.range_subject.seqname, "TE2" ) + self.assertEqual( self._align.range_subject.start, 11 ) + self.assertEqual( self._align.range_subject.end, 17 ) + self.assertEqual( self._align.e_value, float("1e-20") ) + self.assertEqual( self._align.score, float("30") ) + self.assertEquals( float(self._align.identity), float("90.2") ) + + def test__eq__(self): + self._align.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" ) + o = Align() + o.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" ) + self.assertEqual( self._align, o ) + o.setFromString( "chromosome1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" ) + self.assertNotEqual( self._align, o ) + o.setFromString( "chr1\t1\t6\ttranspElem2\t11\t16\t1e-20\t30\t90.2\n" ) + self.assertNotEqual( self._align, o ) + o.setFromString( "chr1\t100\t600\tTE2\t11\t16\t1e-20\t30\t90.2\n" ) + self.assertNotEqual( self._align, o ) + o.setFromString( "chr1\t1\t6\tTE2\t1100\t1600\t1e-20\t30\t90.2\n" ) + self.assertNotEqual( self._align, o ) + o.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30000\t90.2\n" ) + self.assertNotEqual( self._align, o ) + + def test_getSubjectAsMapOfQuery_direct(self): + exp = Map( name="TE2", seqname="chr1", start=1, end=6 ) + self._align.setFromString( "chr1\t1\t6\tTE2\t11\t16\t1e-20\t30\t90.2\n" ) + obs = self._align.getSubjectAsMapOfQuery() + self.assertEqual( obs, exp ) + + def test_getSubjectAsMapOfQuery_reverse(self): + exp = Map( name="TE2", seqname="chr1", start=6, end=1 ) + self._align.setFromString( "chr1\t1\t6\tTE2\t16\t11\t1e-20\t30\t90.2\n" ) + obs = self._align.getSubjectAsMapOfQuery() + self.assertEqual( obs, exp ) + + def test_writeSubjectAsMapOfQuery( self ): + self._align.setFromTuple( ( "chr3", "250", "151", "seq5", "1", "100", "1e-32", "147", "87.9" ) ) + expFile = "dummyExpFile_%s" % ( self._uniqId ) + expFileHandler = open( expFile, "w" ) + expFileHandler.write( "seq5\tchr3\t250\t151\n" ) + expFileHandler.close() + obsFile = "dummyObsFile_%s" % ( self._uniqId ) + obsFileHandler = open( obsFile, "w" ) + self._align.writeSubjectAsMapOfQuery( obsFileHandler ) + obsFileHandler.close() + self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) ) + for f in [ expFile, obsFile ]: + if os.path.exists( f ): + os.remove( f ) + + def test_areQrySbjOnOppositeStrands(self): + self._align.setFromTuple( ( "qry1", "1", "100", "sbj1", "1", "100", "0.0", "135", "95.7" ) ) + obs = self._align.areQrySbjOnOppositeStrands() + self.assertFalse( obs ) + self._align.setFromTuple( ( "qry1", "600", "301", "sbj1", "1", "300", "0.0", "135", "95.7" ) ) + obs = self._align.areQrySbjOnOppositeStrands() + self.assertTrue( obs ) + + def test_reverse(self): + line = "chr1\t1\t10\tTE2\t11\t17\t1e-20\t30\t90.2" + expLine = "chr1\t10\t1\tTE2\t17\t11\t1e-20\t30\t90.200000" + + obsAlignInstance = Align() + obsAlignInstance.setFromString(line) + obsAlignInstance.reverse() + obsLine = obsAlignInstance.toString() + + self.assertEquals(expLine, obsLine) + + def test_getMapsOfQueryAndSubject(self): + self._align.setFromTuple( ( "qry1", "1", "100", "sbj1", "1", "100", "0.0", "135", "95.7" ) ) + + expMapQuery = Map() + expMapQuery.setFromTuple( ( "repet", "qry1", "1", "100" ) ) + expMapSubject = Map() + expMapSubject.setFromTuple( ( "repet", "sbj1", "1", "100" ) ) + + obsMapQuery, obsMapSubject = self._align.getMapsOfQueryAndSubject() + + self.assertEqual( expMapQuery, obsMapQuery ) + self.assertEqual( expMapSubject, obsMapSubject ) + + def test_getBin_bin_level_9(self): + tuple = ("chr1","190000000","390000000","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + expRes = 100000000.0 + obsRes = self._align.getBin() + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_8(self): + tuple = ("chr1","19000000","39000000","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + expRes = 100000000.0 + obsRes = self._align.getBin() + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_7(self): + tuple = ("chr1","1900000","3900000","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + expRes = 10000000.0 + obsRes = self._align.getBin() + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_6(self): + tuple = ("chr1","190000","390000","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 1000000.0 + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_5(self): + tuple = ("chr1","19000","39000","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 100000.0 + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_4(self): + tuple = ("chr1","1900","3900","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 10000.0 + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_3(self): + tuple = ("chr1","190","390","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 1000.0 + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_2(self): + tuple = ("chr1","19","39","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 1000.0 + self.assertEquals(expRes, obsRes) + + def test_getBin_bin_level_1(self): + tuple = ("chr1","1","3","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple(tuple) + obsRes = self._align.getBin() + expRes = 1000.0 + self.assertEquals(expRes, obsRes) + + + def test_switchQuerySubject_directS( self ): + tuple = ("chr1","1","3","TE2","11","17","1e-20","30","90.2") + self._align.setFromTuple( tuple ) + exp = Align( Range("TE2","11","17"), Range("chr1","1","3"), "1e-20", "30", "90.2" ) + self._align.switchQuerySubject() + self.assertEquals( exp, self._align ) + + + def test_switchQuerySubject_reverseS( self ): + tuple = ("chr1","1","3","TE2","17","11","1e-20","30","90.2") + self._align.setFromTuple( tuple ) + exp = Align( Range("TE2","11","17"), Range("chr1","3","1"), "1e-20", "30", "90.2" ) + self._align.switchQuerySubject() + self.assertEquals( exp, self._align ) + + + def test_toStringAsGff( self ): + self._align.setFromString( "chr1\t1\t10\tTE3\t11\t17\t1e-20\t30\t85.2\n" ) + exp = "chr1\tREPET\tmatch\t1\t10\t1e-20\t+\t.\tID=23;Target=TE3 11 17" + obs = self._align.toStringAsGff( ID="23" ) + self.assertEqual( obs, exp ) + + +test_suite = unittest.TestSuite() +test_suite.addTest( unittest.makeSuite( Test_Align ) ) +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run( test_suite )