Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/test/actions.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 """Actions related to running and reporting on Galaxy-specific testing.""" | |
2 | |
3 import io | |
4 import json | |
5 import os | |
6 from distutils.dir_util import copy_tree | |
7 | |
8 import click | |
9 from galaxy.util import unicodify | |
10 | |
11 from planemo.exit_codes import ( | |
12 EXIT_CODE_GENERIC_FAILURE, | |
13 EXIT_CODE_NO_SUCH_TARGET, | |
14 EXIT_CODE_OK, | |
15 ) | |
16 from planemo.galaxy.run import ( | |
17 run_galaxy_command, | |
18 setup_venv, | |
19 ) | |
20 from planemo.io import error, info, shell_join, warn | |
21 from planemo.reports import build_report | |
22 from planemo.test.results import get_dict_value | |
23 from . import structures as test_structures | |
24 | |
25 | |
26 NO_XUNIT_REPORT_MESSAGE = ("Cannot locate xUnit report [%s] for tests - " | |
27 "required to build planemo report and summarize " | |
28 "tests.") | |
29 NO_JSON_REPORT_MESSAGE = ("Cannot locate JSON report [%s] for tests - " | |
30 "required to build planemo report and summarize " | |
31 "tests.") | |
32 REPORT_NOT_CHANGED = ("Galaxy failed to update test report [%s] for tests - " | |
33 "required to build planemo report and summarize " | |
34 "tests.") | |
35 NO_TESTS_MESSAGE = "No tests were executed - see Galaxy output for details." | |
36 ALL_TESTS_PASSED_MESSAGE = "All %d test(s) executed passed." | |
37 PROBLEM_COUNT_MESSAGE = ("There were problems with %d test(s) - out of %d " | |
38 "test(s) executed. See %s for detailed breakdown.") | |
39 GENERIC_PROBLEMS_MESSAGE = ("One or more tests failed. See %s for detailed " | |
40 "breakdown.") | |
41 GENERIC_TESTS_PASSED_MESSAGE = "No failing tests encountered." | |
42 | |
43 | |
44 def run_in_config(ctx, config, run=run_galaxy_command, test_data_target_dir=None, **kwds): | |
45 """Run Galaxy tests with the run_tests.sh command. | |
46 | |
47 The specified `config` object describes the context for tool | |
48 execution. | |
49 """ | |
50 config_directory = config.config_directory | |
51 html_report_file = kwds["test_output"] | |
52 | |
53 job_output_files = kwds.get("job_output_files", None) | |
54 if job_output_files is None: | |
55 job_output_files = os.path.join(config_directory, "jobfiles") | |
56 | |
57 xunit_report_file = _xunit_state(kwds, config) | |
58 xunit_report_file_tracker = _FileChangeTracker(xunit_report_file) | |
59 structured_report_file = _structured_report_file(kwds, config) | |
60 structured_report_file_tracker = _FileChangeTracker(structured_report_file) | |
61 | |
62 info("Testing using galaxy_root %s", config.galaxy_root) | |
63 # TODO: Allow running dockerized Galaxy here instead. | |
64 server_ini = os.path.join(config_directory, "galaxy.ini") | |
65 config.env["GALAXY_CONFIG_FILE"] = server_ini | |
66 config.env["GALAXY_TEST_VERBOSE_ERRORS"] = "true" | |
67 config.env["GALAXY_TEST_SAVE"] = job_output_files | |
68 | |
69 cd_to_galaxy_command = ['cd', config.galaxy_root] | |
70 test_cmd = test_structures.GalaxyTestCommand( | |
71 html_report_file, | |
72 xunit_report_file, | |
73 structured_report_file, | |
74 failed=kwds.get("failed", False), | |
75 installed=kwds.get("installed", False), | |
76 ).build() | |
77 setup_common_startup_args = "" | |
78 if kwds.get("skip_venv", False): | |
79 setup_common_startup_args = shell_join( | |
80 'COMMON_STARTUP_ARGS=--skip-venv', | |
81 'export COMMON_STARTUP_ARGS', | |
82 'echo "Set COMMON_STARTUP_ARGS to ${COMMON_STARTUP_ARGS}"', | |
83 ) | |
84 setup_venv_command = setup_venv(ctx, kwds) | |
85 cmd = shell_join( | |
86 cd_to_galaxy_command, | |
87 setup_common_startup_args, | |
88 setup_venv_command, | |
89 test_cmd, | |
90 ) | |
91 action = "Testing tools" | |
92 return_code = run( | |
93 ctx, | |
94 cmd, | |
95 config.env, | |
96 action | |
97 ) | |
98 if kwds.get('update_test_data', False): | |
99 copy_tree(job_output_files, test_data_target_dir or config.test_data_dir) | |
100 | |
101 _check_test_outputs(xunit_report_file_tracker, structured_report_file_tracker) | |
102 test_results = test_structures.GalaxyTestResults( | |
103 structured_report_file, | |
104 xunit_report_file, | |
105 html_report_file, | |
106 return_code, | |
107 ) | |
108 | |
109 structured_data = test_results.structured_data | |
110 return handle_reports_and_summary( | |
111 ctx, | |
112 structured_data, | |
113 exit_code=test_results.exit_code, | |
114 kwds=kwds | |
115 ) | |
116 | |
117 | |
118 def handle_reports_and_summary(ctx, structured_data, exit_code=None, kwds={}): | |
119 """Produce reports and print summary, return 0 if tests passed. | |
120 | |
121 If ``exit_code`` is set - use underlying test source for return | |
122 code and test success determination, otherwise infer from supplied | |
123 test data. | |
124 """ | |
125 handle_reports(ctx, structured_data, kwds) | |
126 summary_exit_code = _handle_summary( | |
127 structured_data, | |
128 **kwds | |
129 ) | |
130 return exit_code if exit_code is not None else summary_exit_code | |
131 | |
132 | |
133 def merge_reports(input_paths, output_path): | |
134 reports = [] | |
135 for path in input_paths: | |
136 with io.open(path, encoding='utf-8') as f: | |
137 reports.append(json.load(f)) | |
138 tests = [] | |
139 for report in reports: | |
140 tests.extend(report["tests"]) | |
141 merged_report = {"tests": tests} | |
142 with io.open(output_path, mode="w", encoding='utf-8') as out: | |
143 out.write(unicodify(json.dumps(merged_report))) | |
144 | |
145 | |
146 def handle_reports(ctx, structured_data, kwds): | |
147 """Write reports based on user specified kwds.""" | |
148 exceptions = [] | |
149 structured_report_file = kwds.get("test_output_json", None) | |
150 if structured_report_file and not os.path.exists(structured_report_file): | |
151 try: | |
152 with io.open(structured_report_file, mode="w", encoding='utf-8') as f: | |
153 f.write(unicodify(json.dumps(structured_data))) | |
154 except Exception as e: | |
155 exceptions.append(e) | |
156 | |
157 for report_type in ["html", "markdown", "text", "xunit", "junit"]: | |
158 try: | |
159 _handle_test_output_file( | |
160 ctx, report_type, structured_data, kwds | |
161 ) | |
162 except Exception as e: | |
163 exceptions.append(e) | |
164 | |
165 if len(exceptions) > 0: | |
166 raise exceptions[0] | |
167 | |
168 | |
169 def _handle_test_output_file(ctx, report_type, test_data, kwds): | |
170 kwd_name = "test_output" | |
171 if report_type != "html": | |
172 kwd_name = "test_output_%s" % report_type | |
173 | |
174 path = kwds.get(kwd_name, None) | |
175 if path is None: | |
176 message = "No file specified for %s, skipping test output." % kwd_name | |
177 ctx.vlog(message) | |
178 return | |
179 | |
180 try: | |
181 contents = build_report.build_report( | |
182 test_data, report_type=report_type | |
183 ) | |
184 except Exception: | |
185 message = "Problem producing report file %s for %s" % ( | |
186 path, kwd_name | |
187 ) | |
188 ctx.vlog(message, exception=True) | |
189 raise | |
190 | |
191 try: | |
192 with io.open(path, mode='w', encoding='utf-8') as handle: | |
193 handle.write(unicodify(contents)) | |
194 except Exception: | |
195 message = "Problem writing output file %s for %s" % ( | |
196 kwd_name, path | |
197 ) | |
198 ctx.vlog(message, exception=True) | |
199 raise | |
200 | |
201 | |
202 def _handle_summary( | |
203 structured_data, | |
204 **kwds | |
205 ): | |
206 summary_dict = get_dict_value("summary", structured_data) | |
207 num_tests = get_dict_value("num_tests", summary_dict) | |
208 num_failures = get_dict_value("num_failures", summary_dict) | |
209 num_errors = get_dict_value("num_errors", summary_dict) | |
210 num_problems = num_failures + num_errors | |
211 | |
212 summary_exit_code = EXIT_CODE_OK | |
213 if num_problems > 0: | |
214 summary_exit_code = EXIT_CODE_GENERIC_FAILURE | |
215 elif num_tests == 0: | |
216 summary_exit_code = EXIT_CODE_NO_SUCH_TARGET | |
217 | |
218 summary_style = kwds.get("summary") | |
219 if summary_style != "none": | |
220 if num_tests == 0: | |
221 warn(NO_TESTS_MESSAGE) | |
222 elif num_problems == 0: | |
223 info(ALL_TESTS_PASSED_MESSAGE % num_tests) | |
224 elif num_problems: | |
225 html_report_file = kwds.get("test_output") | |
226 message_args = (num_problems, num_tests, html_report_file) | |
227 message = PROBLEM_COUNT_MESSAGE % message_args | |
228 warn(message) | |
229 | |
230 _summarize_tests_full( | |
231 structured_data, | |
232 **kwds | |
233 ) | |
234 | |
235 return summary_exit_code | |
236 | |
237 | |
238 def _summarize_tests_full( | |
239 structured_data, | |
240 **kwds | |
241 ): | |
242 tests = get_dict_value("tests", structured_data) | |
243 for test_case_data in tests: | |
244 _summarize_test_case(test_case_data, **kwds) | |
245 | |
246 | |
247 def passed(xunit_testcase_el): | |
248 did_pass = True | |
249 for child_el in list(xunit_testcase_el): | |
250 if child_el.tag in ["failure", "error"]: | |
251 did_pass = False | |
252 return did_pass | |
253 | |
254 | |
255 def _summarize_test_case(structured_data, **kwds): | |
256 summary_style = kwds.get("summary") | |
257 test_id = test_structures.case_id( | |
258 raw_id=get_dict_value("id", structured_data) | |
259 ) | |
260 status = get_dict_value( | |
261 "status", | |
262 get_dict_value("data", structured_data) | |
263 ) | |
264 if status != "success": | |
265 state = click.style("failed", bold=True, fg='red') | |
266 else: | |
267 state = click.style("passed", bold=True, fg='green') | |
268 click.echo(test_id.label + ": " + state) | |
269 if summary_style != "minimal": | |
270 _print_command_line(structured_data, test_id) | |
271 | |
272 | |
273 def _print_command_line(test, test_id): | |
274 execution_problem = test.get("execution_problem", None) | |
275 if execution_problem: | |
276 click.echo("| command: *could not execute job, no command generated* ") | |
277 return | |
278 | |
279 try: | |
280 command = test["job"]["command_line"] | |
281 except (KeyError, IndexError): | |
282 click.echo("| command: *failed to determine command for job* ") | |
283 return | |
284 | |
285 click.echo("| command: %s" % command) | |
286 | |
287 | |
288 def _check_test_outputs( | |
289 xunit_report_file_tracker, | |
290 structured_report_file_tracker | |
291 ): | |
292 if not os.path.exists(xunit_report_file_tracker.path): | |
293 message = NO_XUNIT_REPORT_MESSAGE % xunit_report_file_tracker.path | |
294 error(message) | |
295 raise Exception(message) | |
296 | |
297 if not os.path.exists(structured_report_file_tracker.path): | |
298 message = NO_JSON_REPORT_MESSAGE % structured_report_file_tracker.path | |
299 error(message) | |
300 raise Exception(message) | |
301 | |
302 if not xunit_report_file_tracker.changed(): | |
303 message = REPORT_NOT_CHANGED % xunit_report_file_tracker.path | |
304 error(message) | |
305 raise Exception(message) | |
306 | |
307 if not structured_report_file_tracker.changed(): | |
308 message = REPORT_NOT_CHANGED % structured_report_file_tracker.path | |
309 error(message) | |
310 raise Exception(message) | |
311 | |
312 | |
313 def _xunit_state(kwds, config): | |
314 # This has been supported in Galaxy for well over a year, just going to assume | |
315 # it from here on out. | |
316 # xunit_supported = True | |
317 # if shell("grep -q xunit '%s'/run_tests.sh" % config.galaxy_root): | |
318 # xunit_supported = False | |
319 | |
320 xunit_report_file = kwds.get("test_output_xunit", None) | |
321 if xunit_report_file is None: | |
322 xunit_report_file = os.path.join(config.config_directory, "xunit.xml") | |
323 | |
324 return xunit_report_file | |
325 | |
326 | |
327 def _structured_report_file(kwds, config): | |
328 # This has been supported in Galaxy for well over a year, just going to assume | |
329 # it from here on out. | |
330 # structured_data_supported = True | |
331 # if shell("grep -q structured_data '%s'/run_tests.sh" % config.galaxy_root): | |
332 # structured_data_supported = False | |
333 | |
334 structured_report_file = None | |
335 structured_report_file = kwds.get("test_output_json", None) | |
336 if structured_report_file is None: | |
337 conf_dir = config.config_directory | |
338 structured_report_file = os.path.join(conf_dir, "structured_data.json") | |
339 | |
340 return structured_report_file | |
341 | |
342 | |
343 class _FileChangeTracker(object): | |
344 | |
345 def __init__(self, path): | |
346 modification_time = None | |
347 if os.path.exists(path): | |
348 modification_time = os.path.getmtime(path) | |
349 | |
350 self.path = path | |
351 self.modification_time = modification_time | |
352 | |
353 def changed(self): | |
354 if self.modification_time: | |
355 new_modification_time = os.path.getmtime(self.path) | |
356 return self.modification_time != new_modification_time | |
357 else: | |
358 return os.path.exists(self.path) | |
359 | |
360 | |
361 __all__ = ( | |
362 "run_in_config", | |
363 "handle_reports", | |
364 "handle_reports_and_summary", | |
365 ) |