Mercurial > repos > pieterlukasse > prims_metabolomics
comparison test/integration_tests.py @ 0:9d5f4f5f764b
Initial commit to toolshed
author | pieter.lukasse@wur.nl |
---|---|
date | Thu, 16 Jan 2014 13:10:00 +0100 |
parents | |
children | 53e1eee93430 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9d5f4f5f764b |
---|---|
1 '''Integration tests for the GCMS project''' | |
2 | |
3 from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 | |
4 from GCMS import library_lookup, combine_output | |
5 from GCMS.rankfilter_GCMS import rankfilter | |
6 import os.path | |
7 import sys | |
8 import unittest | |
9 import re | |
10 | |
11 | |
12 class IntegrationTest(unittest.TestCase): | |
13 def test_library_lookup(self): | |
14 ''' | |
15 Run main for data/NIST_tabular and compare produced files with references determined earlier. | |
16 ''' | |
17 # Create out folder | |
18 outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') | |
19 if not os.path.exists(outdir): | |
20 os.makedirs(outdir) | |
21 outfile_base = os.path.join(outdir, 'produced_library_lookup') | |
22 outfile_txt = outfile_base + '.txt' | |
23 | |
24 #Build up arguments and run | |
25 input_txt = resource_filename(__name__, "data/NIST_tabular.txt") | |
26 library = resource_filename(__name__, "data/RIDB_subset.txt") | |
27 regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt") | |
28 sys.argv = ['test', | |
29 library, | |
30 input_txt, | |
31 'Capillary', | |
32 'Semi-standard non-polar', | |
33 outfile_txt, | |
34 'HP-5', | |
35 regress_model] | |
36 # Execute main function with arguments provided through sys.argv | |
37 library_lookup.main() | |
38 #Compare with reference files | |
39 reference_txt = resource_filename(__name__, 'reference/produced_library_lookup.txt') | |
40 | |
41 #read both the reference file and actual output files | |
42 expected = _read_file(reference_txt) | |
43 actual = _read_file(outfile_txt) | |
44 | |
45 #convert the read in files to lists we can compare | |
46 expected = expected.split() | |
47 actual = actual.split() | |
48 | |
49 for exp, act in zip(expected, actual): | |
50 if re.match('\\d+\\.\\d+', exp): | |
51 exp = float(exp) | |
52 act = float(act) | |
53 self.assertAlmostEqual(exp, act, places=5) | |
54 else: | |
55 # compare values | |
56 self.failUnlessEqual(expected, actual) | |
57 | |
58 | |
59 def test_combine_output_simple(self): | |
60 ''' | |
61 Run main for data/NIST_tabular and compare produced files with references determined earlier. | |
62 ''' | |
63 # Create out folder | |
64 outdir = "output/" #tempfile.mkdtemp(prefix='test_library_lookup') | |
65 if not os.path.exists(outdir): | |
66 os.makedirs(outdir) | |
67 outfile_base = os.path.join(outdir, 'produced_combine_output') | |
68 outfile_single_txt = outfile_base + '_single.txt' | |
69 outfile_multi_txt = outfile_base + '_multi.txt' | |
70 | |
71 #Build up arguments and run | |
72 input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt") | |
73 input_caslookup = resource_filename(__name__, "data/Caslookup.txt") | |
74 sys.argv = ['test', | |
75 input_rankfilter, | |
76 input_caslookup, | |
77 outfile_single_txt, | |
78 outfile_multi_txt] | |
79 # Execute main function with arguments provided through sys.argv | |
80 combine_output.main() | |
81 #Compare with reference files | |
82 # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') | |
83 # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') | |
84 # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) | |
85 # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) | |
86 | |
87 #Clean up | |
88 #shutil.rmtree(tempdir) | |
89 | |
90 | |
91 | |
92 def def_test_rank_filter_advanced(self): | |
93 ''' | |
94 Run main of RankFilter | |
95 ''' | |
96 # Create out folder | |
97 outdir = "output/integration/" | |
98 if not os.path.exists(outdir): | |
99 os.makedirs(outdir) | |
100 | |
101 #Build up arguments and run | |
102 input_txt = resource_filename(__name__, "data/integration/RankFilterInput_conf.txt") | |
103 sys.argv = ['test', | |
104 input_txt] | |
105 # Execute main function with arguments provided through sys.argv | |
106 rankfilter.main() | |
107 #Compare with reference files | |
108 | |
109 def def_test_library_lookup_advanced(self): | |
110 ''' | |
111 Run main for data/NIST_tabular and compare produced files with references determined earlier. | |
112 ''' | |
113 # Create out folder | |
114 outdir = "output/integration/" | |
115 if not os.path.exists(outdir): | |
116 os.makedirs(outdir) | |
117 outfile_base = os.path.join(outdir, 'produced_library_lookup_ADVANCED') | |
118 outfile_txt = outfile_base + '.txt' | |
119 | |
120 #Build up arguments and run | |
121 input_txt = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") | |
122 library = resource_filename(__name__, "data/integration/Library_RI_DB_capillary_columns-noDuplicates.txt") | |
123 regress_model = resource_filename(__name__, "data/integration/regression_MODEL_for_columns.txt") | |
124 sys.argv = ['test', | |
125 library, | |
126 input_txt, | |
127 'Capillary', | |
128 'Semi-standard non-polar', | |
129 outfile_txt, | |
130 'DB-5', | |
131 regress_model] | |
132 # Execute main function with arguments provided through sys.argv | |
133 library_lookup.main() | |
134 | |
135 | |
136 | |
137 def test_combine_output_advanced(self): | |
138 ''' | |
139 Variant on test case above, but a bit more complex as some of the centrotypes have | |
140 different NIST hits which should give them different RI values. This test also | |
141 runs not only the combine output, but the other two preceding steps as well, | |
142 so it ensures the integration also works on the current code of all three tools. | |
143 ''' | |
144 | |
145 # Run RankFilter | |
146 self.def_test_rank_filter_advanced() | |
147 | |
148 # Run library CAS RI lookup | |
149 self.def_test_library_lookup_advanced() | |
150 | |
151 outdir = "output/integration/" | |
152 outfile_base = os.path.join(outdir, 'produced_combine_output') | |
153 outfile_single_txt = outfile_base + '_single.txt' | |
154 outfile_multi_txt = outfile_base + '_multi.txt' | |
155 | |
156 #Build up arguments and run | |
157 input_rankfilter = resource_filename(__name__, "output/integration/produced_rank_filter_out.txt") | |
158 input_caslookup = resource_filename(__name__, "output/integration/produced_library_lookup_ADVANCED.txt") | |
159 sys.argv = ['test', | |
160 input_rankfilter, | |
161 input_caslookup, | |
162 outfile_single_txt, | |
163 outfile_multi_txt] | |
164 # Execute main function with arguments provided through sys.argv | |
165 combine_output.main() | |
166 #Compare with reference files | |
167 # reference_single_txt = resource_filename(__name__, 'reference/produced_combine_output_single.txt') | |
168 # reference_multi_txt = resource_filename(__name__, 'reference/produced_combine_output_multi.txt') | |
169 # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt)) | |
170 # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt)) | |
171 | |
172 # Check 1: output single should have one record per centrotype: | |
173 | |
174 | |
175 # Check 2: output single has more records than output single: | |
176 combine_result_single_items = combine_output._process_data(outfile_single_txt) | |
177 combine_result_multi_items = combine_output._process_data(outfile_multi_txt) | |
178 self.assertGreater(len(combine_result_single_items['Centrotype']), | |
179 len(combine_result_multi_items['Centrotype'])) | |
180 | |
181 | |
182 # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct: | |
183 caslookup_items = combine_output._process_data(input_caslookup) | |
184 rankfilter_items = combine_output._process_data(input_rankfilter) | |
185 | |
186 # check that the caslookup RI column is correctly maintained in its original order in | |
187 # the combined file: | |
188 ri_caslookup = caslookup_items['RI'] | |
189 ri_combine_single = combine_result_single_items['RI'] | |
190 self.assertListEqual(ri_caslookup, ri_combine_single) | |
191 | |
192 # check the centrotype column's integrity: | |
193 centrotype_caslookup = caslookup_items['Centrotype'] | |
194 centrotype_combine_single = combine_result_single_items['Centrotype'] | |
195 centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID']) | |
196 self.assertListEqual(centrotype_caslookup, centrotype_combine_single) | |
197 self.assertListEqual(centrotype_caslookup, centrotype_rankfilter) | |
198 | |
199 # integration and integrity checks: | |
200 file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") | |
201 file_NIST_items = combine_output._process_data(file_NIST) | |
202 # check that rank filter output has exactly the same ID items as the original NIST input file: | |
203 self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) | |
204 # check the same for the CAS column: | |
205 self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS']) | |
206 # now check the NIST CAS column against the cas lookup results: | |
207 cas_NIST = _get_processedcas(file_NIST_items['CAS']) | |
208 self.assertListEqual(cas_NIST, caslookup_items['CAS']) | |
209 # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order | |
210 # and values remained stable throughout all steps: | |
211 self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) | |
212 | |
213 # check that the rankfilter RIsvr column is correctly maintained in its original order in | |
214 # the combined file: | |
215 risvr_rankfilter = rankfilter_items['RIsvr'] | |
216 risvr_combine_single = combine_result_single_items['RIsvr'] | |
217 self.assertListEqual(risvr_rankfilter, risvr_combine_single) | |
218 | |
219 | |
220 | |
221 | |
222 def _get_centrotype_rankfilter(id_list): | |
223 ''' | |
224 returns the list of centrotype ids given a list of ID in the | |
225 form e.g. 74-1.0-564-1905200-7, where the numbers before the | |
226 first "-" are the centrotype id | |
227 ''' | |
228 result = [] | |
229 for compound_id_idx in xrange(len(id_list)): | |
230 compound_id = id_list[compound_id_idx] | |
231 centrotype = compound_id.split('-')[0] | |
232 result.append(centrotype) | |
233 | |
234 return result | |
235 | |
236 | |
237 def _get_processedcas(cas_list): | |
238 ''' | |
239 returns the list cas numbers in the form C64175 instead of 64-17-5 | |
240 ''' | |
241 result = [] | |
242 for cas_id_idx in xrange(len(cas_list)): | |
243 cas = cas_list[cas_id_idx] | |
244 processed_cas = 'C' + str(cas.replace('-', '').strip()) | |
245 result.append(processed_cas) | |
246 | |
247 return result | |
248 | |
249 def _get_strippedcas(cas_list): | |
250 ''' | |
251 removes the leading white space from e.g. " 64-17-5" | |
252 ''' | |
253 result = [] | |
254 for cas_id_idx in xrange(len(cas_list)): | |
255 cas = cas_list[cas_id_idx] | |
256 processed_cas = cas.strip() | |
257 result.append(processed_cas) | |
258 | |
259 return result | |
260 | |
261 | |
262 def _read_file(filename): | |
263 ''' | |
264 Helper method to quickly read a file | |
265 @param filename: | |
266 ''' | |
267 with open(filename) as handle: | |
268 return handle.read() |