Mercurial > repos > yufei-luo > s_mart
comparison commons/launcher/tests/Test_LaunchBlastclust.py @ 31:0ab839023fe4
Uploaded
author | m-zytnicki |
---|---|
date | Tue, 30 Apr 2013 14:33:21 -0400 |
parents | 94ab73e8a190 |
children |
comparison
equal
deleted
inserted
replaced
30:5677346472b5 | 31:0ab839023fe4 |
---|---|
1 import unittest | |
2 import time | |
3 import os | |
4 from commons.launcher.LaunchBlastclust import LaunchBlastclust | |
5 from commons.core.utils.FileUtils import FileUtils | |
6 | |
7 class Test_LaunchBlastclust( unittest.TestCase ): | |
8 | |
9 def setUp(self): | |
10 self._iLaunchBlastclust = LaunchBlastclust() | |
11 self._iLaunchBlastclust.setClean() | |
12 self._uniqId = "%s_%s" % (time.strftime("%Y%m%d%H%M%S"), os.getpid()) | |
13 | |
14 def tearDown(self): | |
15 self._iLaunchBlastclust = None | |
16 self._uniqId = None | |
17 | |
18 def test_getClustersFromTxtFile(self): | |
19 inFileName = "dummyInFile_%s" % self._uniqId | |
20 inF = open(inFileName, "w") | |
21 inF.write("seq1 seq3 seq4 \n") | |
22 inF.write("seq2 seq5 \n") | |
23 inF.close() | |
24 dExp = {1:["seq1","seq3","seq4"], 2:["seq2","seq5"]} | |
25 self._iLaunchBlastclust.setTmpFileName(inFileName) | |
26 dObs = self._iLaunchBlastclust.getClustersFromTxtFile() | |
27 self.assertEqual(dObs, dExp) | |
28 os.remove(inFileName) | |
29 | |
30 def test_getClusteringResultsInFasta_without_filtering(self): | |
31 inFileName = "dummyInFile_%s" % self._uniqId | |
32 inF = open(inFileName, "w") | |
33 inF.write(">seq1\n") | |
34 inF.write("gaattgtttactta\n") | |
35 inF.write(">seq2\n") | |
36 inF.write("gaattgtttactta\n") | |
37 inF.write(">seq3\n") | |
38 inF.write("gaattgtttactta\n") | |
39 inF.write(">seq4\n") | |
40 inF.write("gaattgtttactta\n") | |
41 inF.write(">seq5\n") | |
42 inF.write("gaattgtttactta\n") | |
43 inF.close() | |
44 | |
45 tmpFileName = "%s_blastclust.txt" % self._uniqId | |
46 inF = open(tmpFileName, "w") | |
47 inF.write("seq1 seq3 seq4 \n") | |
48 inF.write("seq2 seq5 \n") | |
49 inF.close() | |
50 self._iLaunchBlastclust.setTmpFileName(tmpFileName) | |
51 | |
52 fileExp = "getClusteringResultsInFastaExpected.fa" | |
53 outF = open(fileExp, "w") | |
54 outF.write(">BlastclustCluster1Mb1_seq1\n") | |
55 outF.write("gaattgtttactta\n") | |
56 outF.write(">BlastclustCluster1Mb2_seq3\n") | |
57 outF.write("gaattgtttactta\n") | |
58 outF.write(">BlastclustCluster1Mb3_seq4\n") | |
59 outF.write("gaattgtttactta\n") | |
60 outF.write(">BlastclustCluster2Mb1_seq2\n") | |
61 outF.write("gaattgtttactta\n") | |
62 outF.write(">BlastclustCluster2Mb2_seq5\n") | |
63 outF.write("gaattgtttactta\n") | |
64 outF.close() | |
65 | |
66 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) | |
67 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] | |
68 | |
69 if not FileUtils.are2FilesIdentical(fileObs, fileExp): | |
70 print "Files are different" | |
71 return | |
72 else: | |
73 print "Files are identical\n" | |
74 | |
75 os.remove(inFileName) | |
76 os.remove(tmpFileName) | |
77 os.remove(fileExp) | |
78 os.remove(fileObs) | |
79 | |
80 def test_getClusteringResultsInFasta_with_filtering(self): | |
81 inFileName = "dummyInFile_%s" % self._uniqId | |
82 inF = open(inFileName, "w") | |
83 inF.write(">seq1\n") | |
84 inF.write("gaattgtttactta\n") | |
85 inF.write(">seq2\n") | |
86 inF.write("gaattgtttactta\n") | |
87 inF.write(">seq3\n") | |
88 inF.write("gaattgtttactta\n") | |
89 inF.write(">seq4\n") | |
90 inF.write("gaattgtttactta\n") | |
91 inF.write(">seq5\n") | |
92 inF.write("gaattgtttactta\n") | |
93 inF.close() | |
94 | |
95 tmpFileName = "%s_blastclust.txt" % self._uniqId | |
96 inF = open(tmpFileName, "w") | |
97 inF.write("seq1 seq3 seq4 \n") | |
98 inF.write("seq2\n") | |
99 inF.write("seq5\n") | |
100 inF.close() | |
101 self._iLaunchBlastclust.setTmpFileName(tmpFileName) | |
102 | |
103 fileExp = "getClusteringResultsInFastaExpected.fa" | |
104 outF = open(fileExp, "w") | |
105 outF.write(">BlastclustCluster1Mb1_seq1\n") | |
106 outF.write("gaattgtttactta\n") | |
107 outF.write(">BlastclustCluster1Mb2_seq3\n") | |
108 outF.write("gaattgtttactta\n") | |
109 outF.write(">BlastclustCluster1Mb3_seq4\n") | |
110 outF.write("gaattgtttactta\n") | |
111 outF.close() | |
112 | |
113 self._iLaunchBlastclust.setFilterUnclusteredSequences() | |
114 self._iLaunchBlastclust.getClusteringResultsInFasta(inFileName) | |
115 fileObs = "%s_Blastclust.fa" % os.path.splitext(inFileName)[0] | |
116 | |
117 if not FileUtils.are2FilesIdentical(fileObs, fileExp): | |
118 print "Files are different" | |
119 return | |
120 else: | |
121 print "Files are identical\n" | |
122 | |
123 os.remove(inFileName) | |
124 os.remove(tmpFileName) | |
125 os.remove(fileExp) | |
126 os.remove(fileObs) | |
127 | |
128 def test_getLinkInitNewHeaders(self): | |
129 inFileName = "dummyInput_%s.shortHlink" % self._uniqId | |
130 inF = open(inFileName, "w") | |
131 inF.write("seq1\tHeader1\t1\t5193\n") | |
132 inF.write("seq2\tHeader2\t1\t5193\n") | |
133 inF.write("seq3\tHeader3\t1\t5193\n") | |
134 inF.write("seq4\tHeader4\t1\t5193\n") | |
135 inF.close() | |
136 | |
137 self._iLaunchBlastclust.setInputFileName("dummyInput_%s" % self._uniqId) | |
138 dObs = self._iLaunchBlastclust.getLinkInitNewHeaders() | |
139 dExp = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} | |
140 | |
141 self.assertEqual(dObs, dExp) | |
142 os.remove(inFileName) | |
143 | |
144 def test_retrieveInitHeaders(self): | |
145 dIn = {"seq1":"Header1", "seq2":"Header2", "seq3":"Header3", "seq4":"Header4"} | |
146 | |
147 inFileName = "dummyInFile_%s" % self._uniqId | |
148 outFilePrefix = self._uniqId | |
149 | |
150 tmpFileName = "%s_blastclust.txt" % outFilePrefix | |
151 inF = open(tmpFileName, "w") | |
152 inF.write("seq1 seq3 seq4\n") | |
153 inF.write("seq2\n") | |
154 inF.close() | |
155 | |
156 shortHFile = "%s.shortH_Blastclust.fa" % inFileName | |
157 shF = open(shortHFile, "w") | |
158 shF.write(">BlastclustCluster1Mb1_seq1\n") | |
159 shF.write("gaattgtttactta\n") | |
160 shF.write(">BlastclustCluster1Mb2_seq3\n") | |
161 shF.write("gaattgtttactta\n") | |
162 shF.write(">BlastclustCluster1Mb3_seq4\n") | |
163 shF.write("gaattgtttactta\n") | |
164 shF.write(">BlastclustCluster2Mb1_seq2\n") | |
165 shF.write("gaattgtttactta\n") | |
166 shF.close() | |
167 | |
168 fileExp = "retrieveInitHeadersExpected.fa" | |
169 outF = open(fileExp, "w") | |
170 outF.write(">BlastclustCluster1Mb1_Header1\n") | |
171 outF.write("gaattgtttactta\n") | |
172 outF.write(">BlastclustCluster1Mb2_Header3\n") | |
173 outF.write("gaattgtttactta\n") | |
174 outF.write(">BlastclustCluster1Mb3_Header4\n") | |
175 outF.write("gaattgtttactta\n") | |
176 outF.write(">BlastclustCluster2Mb1_Header2\n") | |
177 outF.write("gaattgtttactta\n") | |
178 outF.close() | |
179 | |
180 self._iLaunchBlastclust.setInputFileName(inFileName) | |
181 self._iLaunchBlastclust.setTmpFileName(tmpFileName) | |
182 self._iLaunchBlastclust.setOutputFilePrefix(outFilePrefix) | |
183 self._iLaunchBlastclust.retrieveInitHeaders(dIn) | |
184 fileObs = "%s_Blastclust.fa" % outFilePrefix | |
185 | |
186 if not FileUtils.are2FilesIdentical(fileObs, fileExp): | |
187 print "Files are different" | |
188 return | |
189 else: | |
190 print "Files are identical\n" | |
191 | |
192 os.remove(fileObs) | |
193 os.remove(fileExp) | |
194 os.remove(tmpFileName) | |
195 | |
196 def test_filterUnclusteredSequences(self): | |
197 dClusterId2SeqHeaders = {1: ["seq1","seq2"], 2: ["seq3"]} | |
198 dExp = {1: ["seq1","seq2"]} | |
199 dObs = self._iLaunchBlastclust.filterUnclusteredSequences(dClusterId2SeqHeaders) | |
200 self.assertEqual(dObs, dExp) | |
201 | |
202 def test_blastclustToMap(self): | |
203 inFileName = "dummyBlastclustOut_%s.fa" % self._uniqId | |
204 inF = open(inFileName, "w") | |
205 inF.write(">BlastclustCluster1Mb1_chunk1 (dbseq-nr 1) [1,14]\n") | |
206 inF.write("gaattgtttactta\n") | |
207 inF.write(">BlastclustCluster1Mb2_chunk1 (dbseq-nr 1) [30,44]\n") | |
208 inF.write("gaattgtttactta\n") | |
209 inF.write(">BlastclustCluster2Mb1_chunk2 (dbseq-nr 1) [100,114]\n") | |
210 inF.write("gaattgtttactta\n") | |
211 inF.write(">BlastclustCluster3Mb1_chunk5 (dbseq-nr 8) [1000,1014]\n") | |
212 inF.write("gaattgtttactta") | |
213 inF.close() | |
214 | |
215 fileExp = "blastclustToMapExpected.map" | |
216 outF = open(fileExp, "w") | |
217 outF.write("BlastclustCluster1Mb1\tchunk1\t1\t14\n") | |
218 outF.write("BlastclustCluster1Mb2\tchunk1\t30\t44\n") | |
219 outF.write("BlastclustCluster2Mb1\tchunk2\t100\t114\n") | |
220 outF.write("BlastclustCluster3Mb1\tchunk5\t1000\t1014\n") | |
221 outF.close() | |
222 | |
223 self._iLaunchBlastclust.blastclustToMap(inFileName) | |
224 fileObs = "%s.map" % os.path.splitext(inFileName)[0] | |
225 | |
226 if not FileUtils.are2FilesIdentical(fileObs, fileExp): | |
227 print "Files are different" | |
228 return | |
229 else: | |
230 print "Files are identical\n" | |
231 | |
232 os.remove(inFileName) | |
233 os.remove(fileObs) | |
234 os.remove(fileExp) | |
235 | |
236 if __name__ == "__main__": | |
237 unittest.main() |