18
|
1 import unittest
|
|
2 import time
|
|
3 import os
|
|
4 from commons.launcher.LaunchBlastclust import LaunchBlastclust
|
|
5 from commons.core.utils.FileUtils import FileUtils
|
|
6
|
|
7 class Test_LaunchBlastclust( unittest.TestCase ):
|
|
8
|
|
9 def setUp(self):
|
|
10 self._iLaunchBlastclust = LaunchBlastclust()
|
|
11 self._iLaunchBlastclust.setClean()
|
|
12 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid())
|
|
13
|
|
14 def tearDown(self):
|
|
15 self._iLaunchBlastclust = None
|
|
16 self._uniqId = None
|
|
17
|
|
18 def test_getClustersFromTxtFile(self):
|
|
19 inFileName = "dummyInFile_%s" % self._uniqId
|
|
20 inF = open(inFileName, "w")
|
|
21 inF.write("seq1 seq3 seq4 \n")
|
|
22 inF.write("seq2 seq5 \n")
|
|
23 inF.close()
|
|
24 dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]}
|
|
25 self._iLaunchBlastclust.setTmpFileName(inFileName)
|
|
26 dObs = self._iLaunchBlastclust.getClustersFromTxtFile()
|
|
27 self.assertEqual(dObs, dExp)
|
|
28 os.remove(inFileName)
|
|
29
|
|
30 def test_getClusteringResultsInFasta_without_filtering(self):
|
|
31 inFileName = "dummyInFile_%s" % self._uniqId
|
|
32 inF = open(inFileName, "w")
|
|
33 inF.write(">seq1\n")
|
|
34 inF.write("gaattgtttactta\n")
|
|
35 inF.write(">seq2\n")
|
|
36 inF.write("gaattgtttactta\n")
|
|
37 inF.write(">seq3\n")
|
|
38 inF.write("gaattgtttactta\n")
|
|
39 inF.write(">seq4\n")
|
|
40 inF.write("gaattgtttactta\n")
|
|
41 inF.write(">seq5\n")
|
|
42 inF.write("gaattgtttactta\n")
|
|
43 inF.close()
|
|
44
|
|
45 tmpFileName = "%s_blastclust.txt" % self._uniqId
|
|
46 inF = open(tmpFileName, "w")
|
|
47 inF.write("seq1 seq3 seq4 \n")
|
|
48 inF.write("seq2 seq5 \n")
|
|
49 inF.close()
|
|
50 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
|
|
51
|
|
52 fileExp = "getClusteringResultsInFastaExpected.fa"
|
|
53 outF = open(fileExp, "w")
|
|
54 outF.write(">BlastclustCluster1Mb1_seq1\n")
|
|
55 outF.write("gaattgtttactta\n")
|
|
56 outF.write(">BlastclustCluster1Mb2_seq3\n")
|
|
57 outF.write("gaattgtttactta\n")
|
|
58 outF.write(">BlastclustCluster1Mb3_seq4\n")
|
|
59 outF.write("gaattgtttactta\n")
|
|
60 outF.write(">BlastclustCluster2Mb1_seq2\n")
|
|
61 outF.write("gaattgtttactta\n")
|
|
62 outF.write(">BlastclustCluster2Mb2_seq5\n")
|
|
63 outF.write("gaattgtttactta\n")
|
|
64 outF.close()
|
|
65
|
|
66 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
|
|
67 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
|
|
68
|
|
69 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
|
|
70 print "Files are different"
|
|
71 return
|
|
72 else:
|
|
73 print "Files are identical\n"
|
|
74
|
|
75 os.remove(inFileName)
|
|
76 os.remove(tmpFileName)
|
|
77 os.remove(fileExp)
|
|
78 os.remove(fileObs)
|
|
79
|
|
80 def test_getClusteringResultsInFasta_with_filtering(self):
|
|
81 inFileName = "dummyInFile_%s" % self._uniqId
|
|
82 inF = open(inFileName, "w")
|
|
83 inF.write(">seq1\n")
|
|
84 inF.write("gaattgtttactta\n")
|
|
85 inF.write(">seq2\n")
|
|
86 inF.write("gaattgtttactta\n")
|
|
87 inF.write(">seq3\n")
|
|
88 inF.write("gaattgtttactta\n")
|
|
89 inF.write(">seq4\n")
|
|
90 inF.write("gaattgtttactta\n")
|
|
91 inF.write(">seq5\n")
|
|
92 inF.write("gaattgtttactta\n")
|
|
93 inF.close()
|
|
94
|
|
95 tmpFileName = "%s_blastclust.txt" % self._uniqId
|
|
96 inF = open(tmpFileName, "w")
|
|
97 inF.write("seq1 seq3 seq4 \n")
|
|
98 inF.write("seq2\n")
|
|
99 inF.write("seq5\n")
|
|
100 inF.close()
|
|
101 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
|
|
102
|
|
103 fileExp = "getClusteringResultsInFastaExpected.fa"
|
|
104 outF = open(fileExp, "w")
|
|
105 outF.write(">BlastclustCluster1Mb1_seq1\n")
|
|
106 outF.write("gaattgtttactta\n")
|
|
107 outF.write(">BlastclustCluster1Mb2_seq3\n")
|
|
108 outF.write("gaattgtttactta\n")
|
|
109 outF.write(">BlastclustCluster1Mb3_seq4\n")
|
|
110 outF.write("gaattgtttactta\n")
|
|
111 outF.close()
|
|
112
|
|
113 self._iLaunchBlastclust.setFilterUnclusteredSequences()
|
|
114 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName)
|
|
115 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0]
|
|
116
|
|
117 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
|
|
118 print "Files are different"
|
|
119 return
|
|
120 else:
|
|
121 print "Files are identical\n"
|
|
122
|
|
123 os.remove(inFileName)
|
|
124 os.remove(tmpFileName)
|
|
125 os.remove(fileExp)
|
|
126 os.remove(fileObs)
|
|
127
|
|
128 def test_getLinkInitNewHeaders(self):
|
|
129 inFileName = "dummyInput_%s.shortHlink" % self._uniqId
|
|
130 inF = open(inFileName, "w")
|
|
131 inF.write("seq1\tHeader1\t1\t5193\n")
|
|
132 inF.write("seq2\tHeader2\t1\t5193\n")
|
|
133 inF.write("seq3\tHeader3\t1\t5193\n")
|
|
134 inF.write("seq4\tHeader4\t1\t5193\n")
|
|
135 inF.close()
|
|
136
|
|
137 self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId)
|
|
138 dObs = self._iLaunchBlastclust.getLinkInitNewHeaders()
|
|
139 dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
|
|
140
|
|
141 self.assertEqual(dObs, dExp)
|
|
142 os.remove(inFileName)
|
|
143
|
|
144 def test_retrieveInitHeaders(self):
|
|
145 dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"}
|
|
146
|
|
147 inFileName = "dummyInFile_%s" % self._uniqId
|
|
148 outFilePrefix = self._uniqId
|
|
149
|
|
150 tmpFileName = "%s_blastclust.txt" % outFilePrefix
|
|
151 inF = open(tmpFileName, "w")
|
|
152 inF.write("seq1 seq3 seq4\n")
|
|
153 inF.write("seq2\n")
|
|
154 inF.close()
|
|
155
|
|
156 shortHFile = "%s.shortH_Blastclust.fa" % inFileName
|
|
157 shF = open(shortHFile, "w")
|
|
158 shF.write(">BlastclustCluster1Mb1_seq1\n")
|
|
159 shF.write("gaattgtttactta\n")
|
|
160 shF.write(">BlastclustCluster1Mb2_seq3\n")
|
|
161 shF.write("gaattgtttactta\n")
|
|
162 shF.write(">BlastclustCluster1Mb3_seq4\n")
|
|
163 shF.write("gaattgtttactta\n")
|
|
164 shF.write(">BlastclustCluster2Mb1_seq2\n")
|
|
165 shF.write("gaattgtttactta\n")
|
|
166 shF.close()
|
|
167
|
|
168 fileExp = "retrieveInitHeadersExpected.fa"
|
|
169 outF = open(fileExp, "w")
|
|
170 outF.write(">BlastclustCluster1Mb1_Header1\n")
|
|
171 outF.write("gaattgtttactta\n")
|
|
172 outF.write(">BlastclustCluster1Mb2_Header3\n")
|
|
173 outF.write("gaattgtttactta\n")
|
|
174 outF.write(">BlastclustCluster1Mb3_Header4\n")
|
|
175 outF.write("gaattgtttactta\n")
|
|
176 outF.write(">BlastclustCluster2Mb1_Header2\n")
|
|
177 outF.write("gaattgtttactta\n")
|
|
178 outF.close()
|
|
179
|
|
180 self._iLaunchBlastclust.setInputFileName(inFileName)
|
|
181 self._iLaunchBlastclust.setTmpFileName(tmpFileName)
|
|
182 self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix)
|
|
183 self._iLaunchBlastclust.retrieveInitHeaders(dIn)
|
|
184 fileObs = "%s_Blastclust.fa" % outFilePrefix
|
|
185
|
|
186 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
|
|
187 print "Files are different"
|
|
188 return
|
|
189 else:
|
|
190 print "Files are identical\n"
|
|
191
|
|
192 os.remove(fileObs)
|
|
193 os.remove(fileExp)
|
|
194 os.remove(tmpFileName)
|
|
195
|
|
196 def test_filterUnclusteredSequences(self):
|
|
197 dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]}
|
|
198 dExp = {1: ["seq1","seq2"]}
|
|
199 dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders)
|
|
200 self.assertEqual(dObs, dExp)
|
|
201
|
|
202 def test_blastclustToMap(self):
|
|
203 inFileName = "dummyBlastclustOut_%s.fa" % self._uniqId
|
|
204 inF = open(inFileName, "w")
|
|
205 inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n")
|
|
206 inF.write("gaattgtttactta\n")
|
|
207 inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n")
|
|
208 inF.write("gaattgtttactta\n")
|
|
209 inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n")
|
|
210 inF.write("gaattgtttactta\n")
|
|
211 inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n")
|
|
212 inF.write("gaattgtttactta")
|
|
213 inF.close()
|
|
214
|
|
215 fileExp = "blastclustToMapExpected.map"
|
|
216 outF = open(fileExp, "w")
|
|
217 outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n")
|
|
218 outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n")
|
|
219 outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n")
|
|
220 outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n")
|
|
221 outF.close()
|
|
222
|
|
223 self._iLaunchBlastclust.blastclustToMap(inFileName)
|
|
224 fileObs = "%s.map" % os.path.splitext(inFileName)[0]
|
|
225
|
|
226 if not FileUtils.are2FilesIdentical(fileObs, fileExp):
|
|
227 print "Files are different"
|
|
228 return
|
|
229 else:
|
|
230 print "Files are identical\n"
|
|
231
|
|
232 os.remove(inFileName)
|
|
233 os.remove(fileObs)
|
|
234 os.remove(fileExp)
|
|
235
|
|
236 if __name__ == "__main__":
|
|
237 unittest.main() |