Mercurial > repos > yufei-luo > s_mart
diff commons/launcher/tests/Test_LaunchBlastclust.py @ 31:0ab839023fe4
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 14:33:21 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/launcher/tests/Test_LaunchBlastclust.py Tue Apr 30 14:33:21 2013 -0400 @@ -0,0 +1,237 @@ +import unittest +import time +import os +from commons.launcher.LaunchBlastclust import LaunchBlastclust +from commons.core.utils.FileUtils import FileUtils + +class Test_LaunchBlastclust( unittest.TestCase ): + + def setUp(self): + self._iLaunchBlastclust = LaunchBlastclust() + self._iLaunchBlastclust.setClean() + self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid()) + + def tearDown(self): + self._iLaunchBlastclust = None + self._uniqId = None + + def test_getClustersFromTxtFile(self): + inFileName = "dummyInFile_%s" % self._uniqId + inF = open(inFileName, "w") + inF.write("seq1 seq3 seq4 \n") + inF.write("seq2 seq5 \n") + inF.close() + dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]} + self._iLaunchBlastclust.setTmpFileName(inFileName) + dObs = self._iLaunchBlastclust.getClustersFromTxtFile() + self.assertEqual(dObs, dExp) + os.remove(inFileName) + + def test_getClusteringResultsInFasta_without_filtering(self): + inFileName = "dummyInFile_%s" % self._uniqId + inF = open(inFileName, "w") + inF.write(">seq1\n") + inF.write("gaattgtttactta\n") + inF.write(">seq2\n") + inF.write("gaattgtttactta\n") + inF.write(">seq3\n") + inF.write("gaattgtttactta\n") + inF.write(">seq4\n") + inF.write("gaattgtttactta\n") + inF.write(">seq5\n") + inF.write("gaattgtttactta\n") + inF.close() + + tmpFileName = "%s_blastclust.txt" % self._uniqId + inF = open(tmpFileName, "w") + inF.write("seq1 seq3 seq4 \n") + inF.write("seq2 seq5 \n") + inF.close() + self._iLaunchBlastclust.setTmpFileName(tmpFileName) + + fileExp = "getClusteringResultsInFastaExpected.fa" + outF = open(fileExp, "w") + outF.write(">BlastclustCluster1Mb1_seq1\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb2_seq3\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb3_seq4\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster2Mb1_seq2\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster2Mb2_seq5\n") + outF.write("gaattgtttactta\n") + outF.close() + + self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) + fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] + + if not FileUtils.are2FilesIdentical(fileObs, fileExp): + print "Files are different" + return + else: + print "Files are identical\n" + + os.remove(inFileName) + os.remove(tmpFileName) + os.remove(fileExp) + os.remove(fileObs) + + def test_getClusteringResultsInFasta_with_filtering(self): + inFileName = "dummyInFile_%s" % self._uniqId + inF = open(inFileName, "w") + inF.write(">seq1\n") + inF.write("gaattgtttactta\n") + inF.write(">seq2\n") + inF.write("gaattgtttactta\n") + inF.write(">seq3\n") + inF.write("gaattgtttactta\n") + inF.write(">seq4\n") + inF.write("gaattgtttactta\n") + inF.write(">seq5\n") + inF.write("gaattgtttactta\n") + inF.close() + + tmpFileName = "%s_blastclust.txt" % self._uniqId + inF = open(tmpFileName, "w") + inF.write("seq1 seq3 seq4 \n") + inF.write("seq2\n") + inF.write("seq5\n") + inF.close() + self._iLaunchBlastclust.setTmpFileName(tmpFileName) + + fileExp = "getClusteringResultsInFastaExpected.fa" + outF = open(fileExp, "w") + outF.write(">BlastclustCluster1Mb1_seq1\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb2_seq3\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb3_seq4\n") + outF.write("gaattgtttactta\n") + outF.close() + + self._iLaunchBlastclust.setFilterUnclusteredSequences() + self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) + fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] + + if not FileUtils.are2FilesIdentical(fileObs, fileExp): + print "Files are different" + return + else: + print "Files are identical\n" + + os.remove(inFileName) + os.remove(tmpFileName) + os.remove(fileExp) + os.remove(fileObs) + + def test_getLinkInitNewHeaders(self): + inFileName = "dummyInput_%s.shortHlink" % self._uniqId + inF = open(inFileName, "w") + inF.write("seq1\tHeader1\t1\t5193\n") + inF.write("seq2\tHeader2\t1\t5193\n") + inF.write("seq3\tHeader3\t1\t5193\n") + inF.write("seq4\tHeader4\t1\t5193\n") + inF.close() + + self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId) + dObs = self._iLaunchBlastclust.getLinkInitNewHeaders() + dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} + + self.assertEqual(dObs, dExp) + os.remove(inFileName) + + def test_retrieveInitHeaders(self): + dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} + + inFileName = "dummyInFile_%s" % self._uniqId + outFilePrefix = self._uniqId + + tmpFileName = "%s_blastclust.txt" % outFilePrefix + inF = open(tmpFileName, "w") + inF.write("seq1 seq3 seq4\n") + inF.write("seq2\n") + inF.close() + + shortHFile = "%s.shortH_Blastclust.fa" % inFileName + shF = open(shortHFile, "w") + shF.write(">BlastclustCluster1Mb1_seq1\n") + shF.write("gaattgtttactta\n") + shF.write(">BlastclustCluster1Mb2_seq3\n") + shF.write("gaattgtttactta\n") + shF.write(">BlastclustCluster1Mb3_seq4\n") + shF.write("gaattgtttactta\n") + shF.write(">BlastclustCluster2Mb1_seq2\n") + shF.write("gaattgtttactta\n") + shF.close() + + fileExp = "retrieveInitHeadersExpected.fa" + outF = open(fileExp, "w") + outF.write(">BlastclustCluster1Mb1_Header1\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb2_Header3\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster1Mb3_Header4\n") + outF.write("gaattgtttactta\n") + outF.write(">BlastclustCluster2Mb1_Header2\n") + outF.write("gaattgtttactta\n") + outF.close() + + self._iLaunchBlastclust.setInputFileName(inFileName) + self._iLaunchBlastclust.setTmpFileName(tmpFileName) + self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix) + self._iLaunchBlastclust.retrieveInitHeaders(dIn) + fileObs = "%s_Blastclust.fa" % outFilePrefix + + if not FileUtils.are2FilesIdentical(fileObs, fileExp): + print "Files are different" + return + else: + print "Files are identical\n" + + os.remove(fileObs) + os.remove(fileExp) + os.remove(tmpFileName) + + def test_filterUnclusteredSequences(self): + dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]} + dExp = {1: ["seq1","seq2"]} + dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders) + self.assertEqual(dObs, dExp) + + def test_blastclustToMap(self): + inFileName = "dummyBlastclustOut_%s.fa" % self._uniqId + inF = open(inFileName, "w") + inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n") + inF.write("gaattgtttactta\n") + inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n") + inF.write("gaattgtttactta\n") + inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n") + inF.write("gaattgtttactta\n") + inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n") + inF.write("gaattgtttactta") + inF.close() + + fileExp = "blastclustToMapExpected.map" + outF = open(fileExp, "w") + outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n") + outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n") + outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n") + outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n") + outF.close() + + self._iLaunchBlastclust.blastclustToMap(inFileName) + fileObs = "%s.map" % os.path.splitext(inFileName)[0] + + if not FileUtils.are2FilesIdentical(fileObs, fileExp): + print "Files are different" + return + else: + print "Files are identical\n" + + os.remove(inFileName) + os.remove(fileObs) + os.remove(fileExp) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file