Mercurial > repos > yufei-luo > s_mart
view commons/launcher/tests/Test_LaunchBlastclust.py @ 31:0ab839023fe4
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 14:33:21 -0400 |
parents | 94ab73e8a190 |
children |
line wrap: on
line source
import unittest import time import os from commons.launcher.LaunchBlastclust import LaunchBlastclust from commons.core.utils.FileUtils import FileUtils class Test_LaunchBlastclust( unittest.TestCase ): def setUp(self): self._iLaunchBlastclust = LaunchBlastclust() self._iLaunchBlastclust.setClean() self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid()) def tearDown(self): self._iLaunchBlastclust = None self._uniqId = None def test_getClustersFromTxtFile(self): inFileName = "dummyInFile_%s" % self._uniqId inF = open(inFileName, "w") inF.write("seq1 seq3 seq4 \n") inF.write("seq2 seq5 \n") inF.close() dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]} self._iLaunchBlastclust.setTmpFileName(inFileName) dObs = self._iLaunchBlastclust.getClustersFromTxtFile() self.assertEqual(dObs, dExp) os.remove(inFileName) def test_getClusteringResultsInFasta_without_filtering(self): inFileName = "dummyInFile_%s" % self._uniqId inF = open(inFileName, "w") inF.write(">seq1\n") inF.write("gaattgtttactta\n") inF.write(">seq2\n") inF.write("gaattgtttactta\n") inF.write(">seq3\n") inF.write("gaattgtttactta\n") inF.write(">seq4\n") inF.write("gaattgtttactta\n") inF.write(">seq5\n") inF.write("gaattgtttactta\n") inF.close() tmpFileName = "%s_blastclust.txt" % self._uniqId inF = open(tmpFileName, "w") inF.write("seq1 seq3 seq4 \n") inF.write("seq2 seq5 \n") inF.close() self._iLaunchBlastclust.setTmpFileName(tmpFileName) fileExp = "getClusteringResultsInFastaExpected.fa" outF = open(fileExp, "w") outF.write(">BlastclustCluster1Mb1_seq1\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb2_seq3\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb3_seq4\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster2Mb1_seq2\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster2Mb2_seq5\n") outF.write("gaattgtttactta\n") outF.close() self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] if not FileUtils.are2FilesIdentical(fileObs, fileExp): print "Files are different" return else: print "Files are identical\n" os.remove(inFileName) os.remove(tmpFileName) os.remove(fileExp) os.remove(fileObs) def test_getClusteringResultsInFasta_with_filtering(self): inFileName = "dummyInFile_%s" % self._uniqId inF = open(inFileName, "w") inF.write(">seq1\n") inF.write("gaattgtttactta\n") inF.write(">seq2\n") inF.write("gaattgtttactta\n") inF.write(">seq3\n") inF.write("gaattgtttactta\n") inF.write(">seq4\n") inF.write("gaattgtttactta\n") inF.write(">seq5\n") inF.write("gaattgtttactta\n") inF.close() tmpFileName = "%s_blastclust.txt" % self._uniqId inF = open(tmpFileName, "w") inF.write("seq1 seq3 seq4 \n") inF.write("seq2\n") inF.write("seq5\n") inF.close() self._iLaunchBlastclust.setTmpFileName(tmpFileName) fileExp = "getClusteringResultsInFastaExpected.fa" outF = open(fileExp, "w") outF.write(">BlastclustCluster1Mb1_seq1\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb2_seq3\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb3_seq4\n") outF.write("gaattgtttactta\n") outF.close() self._iLaunchBlastclust.setFilterUnclusteredSequences() self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] if not FileUtils.are2FilesIdentical(fileObs, fileExp): print "Files are different" return else: print "Files are identical\n" os.remove(inFileName) os.remove(tmpFileName) os.remove(fileExp) os.remove(fileObs) def test_getLinkInitNewHeaders(self): inFileName = "dummyInput_%s.shortHlink" % self._uniqId inF = open(inFileName, "w") inF.write("seq1\tHeader1\t1\t5193\n") inF.write("seq2\tHeader2\t1\t5193\n") inF.write("seq3\tHeader3\t1\t5193\n") inF.write("seq4\tHeader4\t1\t5193\n") inF.close() self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId) dObs = self._iLaunchBlastclust.getLinkInitNewHeaders() dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} self.assertEqual(dObs, dExp) os.remove(inFileName) def test_retrieveInitHeaders(self): dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} inFileName = "dummyInFile_%s" % self._uniqId outFilePrefix = self._uniqId tmpFileName = "%s_blastclust.txt" % outFilePrefix inF = open(tmpFileName, "w") inF.write("seq1 seq3 seq4\n") inF.write("seq2\n") inF.close() shortHFile = "%s.shortH_Blastclust.fa" % inFileName shF = open(shortHFile, "w") shF.write(">BlastclustCluster1Mb1_seq1\n") shF.write("gaattgtttactta\n") shF.write(">BlastclustCluster1Mb2_seq3\n") shF.write("gaattgtttactta\n") shF.write(">BlastclustCluster1Mb3_seq4\n") shF.write("gaattgtttactta\n") shF.write(">BlastclustCluster2Mb1_seq2\n") shF.write("gaattgtttactta\n") shF.close() fileExp = "retrieveInitHeadersExpected.fa" outF = open(fileExp, "w") outF.write(">BlastclustCluster1Mb1_Header1\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb2_Header3\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster1Mb3_Header4\n") outF.write("gaattgtttactta\n") outF.write(">BlastclustCluster2Mb1_Header2\n") outF.write("gaattgtttactta\n") outF.close() self._iLaunchBlastclust.setInputFileName(inFileName) self._iLaunchBlastclust.setTmpFileName(tmpFileName) self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix) self._iLaunchBlastclust.retrieveInitHeaders(dIn) fileObs = "%s_Blastclust.fa" % outFilePrefix if not FileUtils.are2FilesIdentical(fileObs, fileExp): print "Files are different" return else: print "Files are identical\n" os.remove(fileObs) os.remove(fileExp) os.remove(tmpFileName) def test_filterUnclusteredSequences(self): dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]} dExp = {1: ["seq1","seq2"]} dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders) self.assertEqual(dObs, dExp) def test_blastclustToMap(self): inFileName = "dummyBlastclustOut_%s.fa" % self._uniqId inF = open(inFileName, "w") inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n") inF.write("gaattgtttactta\n") inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n") inF.write("gaattgtttactta\n") inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n") inF.write("gaattgtttactta\n") inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n") inF.write("gaattgtttactta") inF.close() fileExp = "blastclustToMapExpected.map" outF = open(fileExp, "w") outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n") outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n") outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n") outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n") outF.close() self._iLaunchBlastclust.blastclustToMap(inFileName) fileObs = "%s.map" % os.path.splitext(inFileName)[0] if not FileUtils.are2FilesIdentical(fileObs, fileExp): print "Files are different" return else: print "Files are identical\n" os.remove(inFileName) os.remove(fileObs) os.remove(fileExp) if __name__ == "__main__": unittest.main()