|
539
|
1 """
|
|
|
2 Unit tests for MAREA, flux_simulation, and related visualization modules.
|
|
|
3
|
|
|
4 Run with: python -m pytest test_marea.py -v
|
|
|
5 Or: python test_marea.py
|
|
|
6 """
|
|
|
7
|
|
|
8 import sys
|
|
|
9 import os
|
|
|
10 import pandas as pd
|
|
|
11 import numpy as np
|
|
|
12 import tempfile
|
|
|
13 from pathlib import Path
|
|
|
14
|
|
|
15 # Try to import pytest, but don't fail if not available
|
|
|
16 try:
|
|
|
17 import pytest
|
|
|
18 HAS_PYTEST = True
|
|
|
19 except ImportError:
|
|
|
20 HAS_PYTEST = False
|
|
|
21 class _DummyPytest:
|
|
|
22 class raises:
|
|
|
23 def __init__(self, *args, **kwargs):
|
|
|
24 self.expected_exceptions = args
|
|
|
25 def __enter__(self):
|
|
|
26 return self
|
|
|
27 def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
28 if exc_type is None:
|
|
|
29 raise AssertionError("Expected an exception but none was raised")
|
|
|
30 if not any(issubclass(exc_type, e) for e in self.expected_exceptions):
|
|
|
31 return False
|
|
|
32 return True
|
|
|
33 pytest = _DummyPytest()
|
|
|
34
|
|
|
35 # Add parent directory to path
|
|
|
36 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
37
|
|
|
38 import marea
|
|
|
39 import flux_simulation
|
|
|
40 import flux_to_map
|
|
|
41 import ras_to_bounds
|
|
|
42 import utils.general_utils as utils
|
|
|
43
|
|
|
44 # Get the tool directory
|
|
|
45 TOOL_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
|
|
46
|
|
|
47
|
|
|
48 class TestMAREA:
|
|
|
49 """Tests for marea module"""
|
|
|
50
|
|
|
51 def test_process_args(self):
|
|
|
52 """Test argument processing for MAREA"""
|
|
|
53 # Create minimal args for testing
|
|
|
54 with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
|
|
|
55 f.write("reaction_id,value\n")
|
|
|
56 f.write("r1,1.5\n")
|
|
|
57 temp_file = f.name
|
|
|
58
|
|
|
59 try:
|
|
|
60 args = marea.process_args([
|
|
|
61 '-td', TOOL_DIR,
|
|
|
62 '--tool_dir', TOOL_DIR
|
|
|
63 ])
|
|
|
64 assert hasattr(args, 'tool_dir')
|
|
|
65 assert args.tool_dir == TOOL_DIR
|
|
|
66 finally:
|
|
|
67 if os.path.exists(temp_file):
|
|
|
68 os.unlink(temp_file)
|
|
|
69
|
|
|
70 def test_comparison_types(self):
|
|
|
71 """Test that comparison type enum exists and is correct"""
|
|
|
72 # Check that the ComparisonType enum has expected values
|
|
|
73 assert hasattr(marea, 'ComparisonType') or hasattr(marea, 'GroupingCriterion')
|
|
|
74
|
|
|
75 def test_ras_transformation(self):
|
|
|
76 """Test RAS transformation logic"""
|
|
|
77 # Create sample RAS data
|
|
|
78 ras_data = pd.DataFrame({
|
|
|
79 'reaction': ['r1', 'r2', 'r3'],
|
|
|
80 'value': [1.5, 0.5, 2.0]
|
|
|
81 })
|
|
|
82
|
|
|
83 # Test that data can be processed
|
|
|
84 assert len(ras_data) == 3
|
|
|
85 assert ras_data['value'].max() == 2.0
|
|
|
86
|
|
|
87
|
|
|
88 class TestFluxSimulation:
|
|
|
89 """Tests for flux_simulation module"""
|
|
|
90
|
|
|
91 def test_process_args(self):
|
|
|
92 """Test argument processing for flux simulation"""
|
|
|
93 args = flux_simulation.process_args([
|
|
|
94 '-td', TOOL_DIR
|
|
|
95 ])
|
|
|
96 assert hasattr(args, 'tool_dir')
|
|
|
97
|
|
|
98 def test_flux_balance_setup(self):
|
|
|
99 """Test that FBA setup functions exist"""
|
|
|
100 # Check that key functions exist
|
|
|
101 assert hasattr(flux_simulation, 'process_args')
|
|
|
102 assert hasattr(flux_simulation, 'main')
|
|
|
103
|
|
|
104
|
|
|
105 class TestFluxToMap:
|
|
|
106 """Tests for flux_to_map module"""
|
|
|
107
|
|
|
108 def test_process_args(self):
|
|
|
109 """Test argument processing for flux to map"""
|
|
|
110 args = flux_to_map.process_args([
|
|
|
111 '-td', TOOL_DIR
|
|
|
112 ])
|
|
|
113 assert hasattr(args, 'tool_dir')
|
|
|
114
|
|
|
115 def test_color_map_options(self):
|
|
|
116 """Test that color map options are available"""
|
|
|
117 # The module should have color map functionality
|
|
|
118 assert hasattr(flux_to_map, 'process_args')
|
|
|
119
|
|
|
120
|
|
|
121 class TestRasToBounds:
|
|
|
122 """Tests for ras_to_bounds module"""
|
|
|
123
|
|
|
124 def test_process_args(self):
|
|
|
125 """Test argument processing for RAS to bounds"""
|
|
|
126 args = ras_to_bounds.process_args([
|
|
|
127 '-td', TOOL_DIR
|
|
|
128 ])
|
|
|
129 assert hasattr(args, 'tool_dir')
|
|
|
130
|
|
|
131 def test_bounds_conversion(self):
|
|
|
132 """Test that bounds conversion logic exists"""
|
|
|
133 # Create sample RAS data
|
|
|
134 ras_data = {
|
|
|
135 'r1': 1.5,
|
|
|
136 'r2': 0.5,
|
|
|
137 'r3': 2.0
|
|
|
138 }
|
|
|
139
|
|
|
140 # Test basic transformation logic
|
|
|
141 # Reactions with higher RAS should have higher bounds
|
|
|
142 assert ras_data['r3'] > ras_data['r1'] > ras_data['r2']
|
|
|
143
|
|
|
144
|
|
|
145 class TestModelConversion:
|
|
|
146 """Tests for model conversion tools"""
|
|
|
147
|
|
|
148 def test_tabular_to_model(self):
|
|
|
149 """Test tabular to model conversion"""
|
|
|
150 import tabular2MetabolicModel
|
|
|
151
|
|
|
152 args = tabular2MetabolicModel.process_args([])
|
|
|
153 assert hasattr(args, 'tool_dir')
|
|
|
154
|
|
|
155 def test_model_to_tabular(self):
|
|
|
156 """Test model to tabular conversion"""
|
|
|
157 import metabolicModel2Tabular
|
|
|
158
|
|
|
159 args = metabolicModel2Tabular.process_args([])
|
|
|
160 assert hasattr(args, 'tool_dir')
|
|
|
161
|
|
|
162
|
|
|
163 class TestDataProcessing:
|
|
|
164 """Tests for data processing utilities used across tools"""
|
|
|
165
|
|
|
166 def test_ras_data_format(self):
|
|
|
167 """Test RAS data format validation"""
|
|
|
168 # Create valid RAS data
|
|
|
169 ras_df = pd.DataFrame({
|
|
|
170 'reaction_id': ['r1', 'r2', 'r3'],
|
|
|
171 'group1': [1.5, 0.5, 2.0],
|
|
|
172 'group2': [1.8, 0.3, 2.2]
|
|
|
173 })
|
|
|
174
|
|
|
175 assert 'reaction_id' in ras_df.columns
|
|
|
176 assert len(ras_df) > 0
|
|
|
177
|
|
|
178 def test_rps_data_format(self):
|
|
|
179 """Test RPS data format validation"""
|
|
|
180 # Create valid RPS data
|
|
|
181 rps_df = pd.DataFrame({
|
|
|
182 'reaction_id': ['r1', 'r2', 'r3'],
|
|
|
183 'sample1': [100.5, 50.3, 200.1],
|
|
|
184 'sample2': [150.2, 30.8, 250.5]
|
|
|
185 })
|
|
|
186
|
|
|
187 assert 'reaction_id' in rps_df.columns
|
|
|
188 assert len(rps_df) > 0
|
|
|
189
|
|
|
190 def test_flux_data_format(self):
|
|
|
191 """Test flux data format validation"""
|
|
|
192 # Create valid flux data
|
|
|
193 flux_df = pd.DataFrame({
|
|
|
194 'reaction_id': ['r1', 'r2', 'r3'],
|
|
|
195 'flux': [1.5, -0.5, 2.0],
|
|
|
196 'lower_bound': [-10, -10, 0],
|
|
|
197 'upper_bound': [10, 10, 10]
|
|
|
198 })
|
|
|
199
|
|
|
200 assert 'reaction_id' in flux_df.columns
|
|
|
201 assert 'flux' in flux_df.columns
|
|
|
202
|
|
|
203
|
|
|
204 class TestStatistics:
|
|
|
205 """Tests for statistical operations in MAREA"""
|
|
|
206
|
|
|
207 def test_fold_change_calculation(self):
|
|
|
208 """Test fold change calculation"""
|
|
|
209 # Simple fold change test
|
|
|
210 group1_mean = 2.0
|
|
|
211 group2_mean = 4.0
|
|
|
212 fold_change = group2_mean / group1_mean
|
|
|
213
|
|
|
214 assert fold_change == 2.0
|
|
|
215
|
|
|
216 def test_log_fold_change(self):
|
|
|
217 """Test log fold change calculation"""
|
|
|
218 group1_mean = 2.0
|
|
|
219 group2_mean = 8.0
|
|
|
220 log_fc = np.log2(group2_mean / group1_mean)
|
|
|
221
|
|
|
222 assert log_fc == 2.0 # log2(8/2) = log2(4) = 2
|
|
|
223
|
|
|
224 def test_pvalue_correction(self):
|
|
|
225 """Test that statistical functions handle edge cases"""
|
|
|
226 # Test with identical values (should give p-value close to 1)
|
|
|
227 group1 = [1.0, 1.0, 1.0]
|
|
|
228 group2 = [1.0, 1.0, 1.0]
|
|
|
229
|
|
|
230 from scipy import stats
|
|
|
231 t_stat, p_value = stats.ttest_ind(group1, group2)
|
|
|
232
|
|
|
233 # p-value should be NaN or close to 1 for identical groups
|
|
|
234 assert np.isnan(p_value) or p_value > 0.9
|
|
|
235
|
|
|
236
|
|
|
237 class TestMapVisualization:
|
|
|
238 """Tests for SVG map visualization"""
|
|
|
239
|
|
|
240 def test_svg_maps_exist(self):
|
|
|
241 """Test that SVG maps exist"""
|
|
|
242 map_dir = os.path.join(TOOL_DIR, "local", "svg metabolic maps")
|
|
|
243 assert os.path.exists(map_dir)
|
|
|
244
|
|
|
245 # Check for at least one map
|
|
|
246 maps = [f for f in os.listdir(map_dir) if f.endswith('.svg')]
|
|
|
247 assert len(maps) > 0, "No SVG maps found"
|
|
|
248
|
|
|
249 def test_model_has_map(self):
|
|
|
250 """Test that models have associated maps"""
|
|
|
251 # ENGRO2 should have a map
|
|
|
252 engro2_map = os.path.join(TOOL_DIR, "local", "svg metabolic maps", "ENGRO2_map.svg")
|
|
|
253 if os.path.exists(engro2_map):
|
|
|
254 assert os.path.getsize(engro2_map) > 0
|
|
|
255
|
|
|
256 def test_color_gradient(self):
|
|
|
257 """Test color gradient generation"""
|
|
|
258 # Test that we can generate colors for a range of values
|
|
|
259 values = [-2.0, -1.0, 0.0, 1.0, 2.0]
|
|
|
260
|
|
|
261 # All values should be processable
|
|
|
262 for val in values:
|
|
|
263 # Simple color mapping test
|
|
|
264 if val < 0:
|
|
|
265 # Negative values should map to one color scheme
|
|
|
266 assert val < 0
|
|
|
267 elif val > 0:
|
|
|
268 # Positive values should map to another
|
|
|
269 assert val > 0
|
|
|
270 else:
|
|
|
271 # Zero should be neutral
|
|
|
272 assert val == 0
|
|
|
273
|
|
|
274
|
|
|
275 class TestIntegration:
|
|
|
276 """Integration tests for complete workflows"""
|
|
|
277
|
|
|
278 def test_ras_to_marea_workflow(self):
|
|
|
279 """Test that RAS data can flow into MAREA"""
|
|
|
280 # Create sample RAS data
|
|
|
281 ras_data = pd.DataFrame({
|
|
|
282 'reaction_id': ['r1', 'r2', 'r3'],
|
|
|
283 'control': [1.5, 0.8, 1.2],
|
|
|
284 'treatment': [2.0, 0.5, 1.8]
|
|
|
285 })
|
|
|
286
|
|
|
287 # Calculate fold changes
|
|
|
288 ras_data['fold_change'] = ras_data['treatment'] / ras_data['control']
|
|
|
289
|
|
|
290 assert 'fold_change' in ras_data.columns
|
|
|
291 assert len(ras_data) == 3
|
|
|
292
|
|
|
293 def test_rps_to_flux_workflow(self):
|
|
|
294 """Test that RPS data can be used for flux simulation"""
|
|
|
295 # Create sample RPS data
|
|
|
296 rps_data = pd.DataFrame({
|
|
|
297 'reaction_id': ['r1', 'r2', 'r3'],
|
|
|
298 'rps': [100.0, 50.0, 200.0]
|
|
|
299 })
|
|
|
300
|
|
|
301 # RPS can be used to set bounds
|
|
|
302 rps_data['upper_bound'] = rps_data['rps'] / 10
|
|
|
303
|
|
|
304 assert 'upper_bound' in rps_data.columns
|
|
|
305
|
|
|
306
|
|
|
307 class TestErrorHandling:
|
|
|
308 """Tests for error handling across modules"""
|
|
|
309
|
|
|
310 def test_invalid_model_name(self):
|
|
|
311 """Test handling of invalid model names"""
|
|
|
312 with pytest.raises((ValueError, KeyError, AttributeError)):
|
|
|
313 utils.Model("INVALID_MODEL")
|
|
|
314
|
|
|
315 def test_missing_required_column(self):
|
|
|
316 """Test handling of missing required columns"""
|
|
|
317 # Create incomplete data
|
|
|
318 incomplete_data = pd.DataFrame({
|
|
|
319 'wrong_column': [1, 2, 3]
|
|
|
320 })
|
|
|
321
|
|
|
322 # Should fail when looking for required columns
|
|
|
323 with pytest.raises(KeyError):
|
|
|
324 value = incomplete_data['reaction_id']
|
|
|
325
|
|
|
326
|
|
|
327 if __name__ == "__main__":
|
|
|
328 # Run tests with pytest if available
|
|
|
329 if HAS_PYTEST:
|
|
|
330 pytest.main([__file__, "-v"])
|
|
|
331 else:
|
|
|
332 print("pytest not available, running basic tests...")
|
|
|
333
|
|
|
334 test_classes = [
|
|
|
335 TestMAREA(),
|
|
|
336 TestFluxSimulation(),
|
|
|
337 TestFluxToMap(),
|
|
|
338 TestRasToBounds(),
|
|
|
339 TestModelConversion(),
|
|
|
340 TestDataProcessing(),
|
|
|
341 TestStatistics(),
|
|
|
342 TestMapVisualization(),
|
|
|
343 TestIntegration(),
|
|
|
344 TestErrorHandling()
|
|
|
345 ]
|
|
|
346
|
|
|
347 failed = 0
|
|
|
348 passed = 0
|
|
|
349
|
|
|
350 for test_class in test_classes:
|
|
|
351 class_name = test_class.__class__.__name__
|
|
|
352 print(f"\n{class_name}:")
|
|
|
353
|
|
|
354 for method_name in dir(test_class):
|
|
|
355 if method_name.startswith("test_"):
|
|
|
356 try:
|
|
|
357 method = getattr(test_class, method_name)
|
|
|
358 method()
|
|
|
359 print(f" ✓ {method_name}")
|
|
|
360 passed += 1
|
|
|
361 except Exception as e:
|
|
|
362 print(f" ✗ {method_name}: {str(e)}")
|
|
|
363 import traceback
|
|
|
364 traceback.print_exc()
|
|
|
365 failed += 1
|
|
|
366
|
|
|
367 print(f"\n{'='*60}")
|
|
|
368 print(f"Results: {passed} passed, {failed} failed")
|
|
|
369 if failed > 0:
|
|
|
370 sys.exit(1)
|