comparison env/lib/python3.7/site-packages/planemo/galaxy/test/actions.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Actions related to running and reporting on Galaxy-specific testing."""
2
3 import io
4 import json
5 import os
6 from distutils.dir_util import copy_tree
7
8 import click
9 from galaxy.util import unicodify
10
11 from planemo.exit_codes import (
12 EXIT_CODE_GENERIC_FAILURE,
13 EXIT_CODE_NO_SUCH_TARGET,
14 EXIT_CODE_OK,
15 )
16 from planemo.galaxy.run import (
17 run_galaxy_command,
18 setup_venv,
19 )
20 from planemo.io import error, info, shell_join, warn
21 from planemo.reports import build_report
22 from planemo.test.results import get_dict_value
23 from . import structures as test_structures
24
25
26 NO_XUNIT_REPORT_MESSAGE = ("Cannot locate xUnit report [%s] for tests - "
27 "required to build planemo report and summarize "
28 "tests.")
29 NO_JSON_REPORT_MESSAGE = ("Cannot locate JSON report [%s] for tests - "
30 "required to build planemo report and summarize "
31 "tests.")
32 REPORT_NOT_CHANGED = ("Galaxy failed to update test report [%s] for tests - "
33 "required to build planemo report and summarize "
34 "tests.")
35 NO_TESTS_MESSAGE = "No tests were executed - see Galaxy output for details."
36 ALL_TESTS_PASSED_MESSAGE = "All %d test(s) executed passed."
37 PROBLEM_COUNT_MESSAGE = ("There were problems with %d test(s) - out of %d "
38 "test(s) executed. See %s for detailed breakdown.")
39 GENERIC_PROBLEMS_MESSAGE = ("One or more tests failed. See %s for detailed "
40 "breakdown.")
41 GENERIC_TESTS_PASSED_MESSAGE = "No failing tests encountered."
42
43
44 def run_in_config(ctx, config, run=run_galaxy_command, test_data_target_dir=None, **kwds):
45 """Run Galaxy tests with the run_tests.sh command.
46
47 The specified `config` object describes the context for tool
48 execution.
49 """
50 config_directory = config.config_directory
51 html_report_file = kwds["test_output"]
52
53 job_output_files = kwds.get("job_output_files", None)
54 if job_output_files is None:
55 job_output_files = os.path.join(config_directory, "jobfiles")
56
57 xunit_report_file = _xunit_state(kwds, config)
58 xunit_report_file_tracker = _FileChangeTracker(xunit_report_file)
59 structured_report_file = _structured_report_file(kwds, config)
60 structured_report_file_tracker = _FileChangeTracker(structured_report_file)
61
62 info("Testing using galaxy_root %s", config.galaxy_root)
63 # TODO: Allow running dockerized Galaxy here instead.
64 server_ini = os.path.join(config_directory, "galaxy.ini")
65 config.env["GALAXY_CONFIG_FILE"] = server_ini
66 config.env["GALAXY_TEST_VERBOSE_ERRORS"] = "true"
67 config.env["GALAXY_TEST_SAVE"] = job_output_files
68
69 cd_to_galaxy_command = ['cd', config.galaxy_root]
70 test_cmd = test_structures.GalaxyTestCommand(
71 html_report_file,
72 xunit_report_file,
73 structured_report_file,
74 failed=kwds.get("failed", False),
75 installed=kwds.get("installed", False),
76 ).build()
77 setup_common_startup_args = ""
78 if kwds.get("skip_venv", False):
79 setup_common_startup_args = shell_join(
80 'COMMON_STARTUP_ARGS=--skip-venv',
81 'export COMMON_STARTUP_ARGS',
82 'echo "Set COMMON_STARTUP_ARGS to ${COMMON_STARTUP_ARGS}"',
83 )
84 setup_venv_command = setup_venv(ctx, kwds)
85 cmd = shell_join(
86 cd_to_galaxy_command,
87 setup_common_startup_args,
88 setup_venv_command,
89 test_cmd,
90 )
91 action = "Testing tools"
92 return_code = run(
93 ctx,
94 cmd,
95 config.env,
96 action
97 )
98 if kwds.get('update_test_data', False):
99 copy_tree(job_output_files, test_data_target_dir or config.test_data_dir)
100
101 _check_test_outputs(xunit_report_file_tracker, structured_report_file_tracker)
102 test_results = test_structures.GalaxyTestResults(
103 structured_report_file,
104 xunit_report_file,
105 html_report_file,
106 return_code,
107 )
108
109 structured_data = test_results.structured_data
110 return handle_reports_and_summary(
111 ctx,
112 structured_data,
113 exit_code=test_results.exit_code,
114 kwds=kwds
115 )
116
117
118 def handle_reports_and_summary(ctx, structured_data, exit_code=None, kwds={}):
119 """Produce reports and print summary, return 0 if tests passed.
120
121 If ``exit_code`` is set - use underlying test source for return
122 code and test success determination, otherwise infer from supplied
123 test data.
124 """
125 handle_reports(ctx, structured_data, kwds)
126 summary_exit_code = _handle_summary(
127 structured_data,
128 **kwds
129 )
130 return exit_code if exit_code is not None else summary_exit_code
131
132
133 def merge_reports(input_paths, output_path):
134 reports = []
135 for path in input_paths:
136 with io.open(path, encoding='utf-8') as f:
137 reports.append(json.load(f))
138 tests = []
139 for report in reports:
140 tests.extend(report["tests"])
141 merged_report = {"tests": tests}
142 with io.open(output_path, mode="w", encoding='utf-8') as out:
143 out.write(unicodify(json.dumps(merged_report)))
144
145
146 def handle_reports(ctx, structured_data, kwds):
147 """Write reports based on user specified kwds."""
148 exceptions = []
149 structured_report_file = kwds.get("test_output_json", None)
150 if structured_report_file and not os.path.exists(structured_report_file):
151 try:
152 with io.open(structured_report_file, mode="w", encoding='utf-8') as f:
153 f.write(unicodify(json.dumps(structured_data)))
154 except Exception as e:
155 exceptions.append(e)
156
157 for report_type in ["html", "markdown", "text", "xunit", "junit"]:
158 try:
159 _handle_test_output_file(
160 ctx, report_type, structured_data, kwds
161 )
162 except Exception as e:
163 exceptions.append(e)
164
165 if len(exceptions) > 0:
166 raise exceptions[0]
167
168
169 def _handle_test_output_file(ctx, report_type, test_data, kwds):
170 kwd_name = "test_output"
171 if report_type != "html":
172 kwd_name = "test_output_%s" % report_type
173
174 path = kwds.get(kwd_name, None)
175 if path is None:
176 message = "No file specified for %s, skipping test output." % kwd_name
177 ctx.vlog(message)
178 return
179
180 try:
181 contents = build_report.build_report(
182 test_data, report_type=report_type
183 )
184 except Exception:
185 message = "Problem producing report file %s for %s" % (
186 path, kwd_name
187 )
188 ctx.vlog(message, exception=True)
189 raise
190
191 try:
192 with io.open(path, mode='w', encoding='utf-8') as handle:
193 handle.write(unicodify(contents))
194 except Exception:
195 message = "Problem writing output file %s for %s" % (
196 kwd_name, path
197 )
198 ctx.vlog(message, exception=True)
199 raise
200
201
202 def _handle_summary(
203 structured_data,
204 **kwds
205 ):
206 summary_dict = get_dict_value("summary", structured_data)
207 num_tests = get_dict_value("num_tests", summary_dict)
208 num_failures = get_dict_value("num_failures", summary_dict)
209 num_errors = get_dict_value("num_errors", summary_dict)
210 num_problems = num_failures + num_errors
211
212 summary_exit_code = EXIT_CODE_OK
213 if num_problems > 0:
214 summary_exit_code = EXIT_CODE_GENERIC_FAILURE
215 elif num_tests == 0:
216 summary_exit_code = EXIT_CODE_NO_SUCH_TARGET
217
218 summary_style = kwds.get("summary")
219 if summary_style != "none":
220 if num_tests == 0:
221 warn(NO_TESTS_MESSAGE)
222 elif num_problems == 0:
223 info(ALL_TESTS_PASSED_MESSAGE % num_tests)
224 elif num_problems:
225 html_report_file = kwds.get("test_output")
226 message_args = (num_problems, num_tests, html_report_file)
227 message = PROBLEM_COUNT_MESSAGE % message_args
228 warn(message)
229
230 _summarize_tests_full(
231 structured_data,
232 **kwds
233 )
234
235 return summary_exit_code
236
237
238 def _summarize_tests_full(
239 structured_data,
240 **kwds
241 ):
242 tests = get_dict_value("tests", structured_data)
243 for test_case_data in tests:
244 _summarize_test_case(test_case_data, **kwds)
245
246
247 def passed(xunit_testcase_el):
248 did_pass = True
249 for child_el in list(xunit_testcase_el):
250 if child_el.tag in ["failure", "error"]:
251 did_pass = False
252 return did_pass
253
254
255 def _summarize_test_case(structured_data, **kwds):
256 summary_style = kwds.get("summary")
257 test_id = test_structures.case_id(
258 raw_id=get_dict_value("id", structured_data)
259 )
260 status = get_dict_value(
261 "status",
262 get_dict_value("data", structured_data)
263 )
264 if status != "success":
265 state = click.style("failed", bold=True, fg='red')
266 else:
267 state = click.style("passed", bold=True, fg='green')
268 click.echo(test_id.label + ": " + state)
269 if summary_style != "minimal":
270 _print_command_line(structured_data, test_id)
271
272
273 def _print_command_line(test, test_id):
274 execution_problem = test.get("execution_problem", None)
275 if execution_problem:
276 click.echo("| command: *could not execute job, no command generated* ")
277 return
278
279 try:
280 command = test["job"]["command_line"]
281 except (KeyError, IndexError):
282 click.echo("| command: *failed to determine command for job* ")
283 return
284
285 click.echo("| command: %s" % command)
286
287
288 def _check_test_outputs(
289 xunit_report_file_tracker,
290 structured_report_file_tracker
291 ):
292 if not os.path.exists(xunit_report_file_tracker.path):
293 message = NO_XUNIT_REPORT_MESSAGE % xunit_report_file_tracker.path
294 error(message)
295 raise Exception(message)
296
297 if not os.path.exists(structured_report_file_tracker.path):
298 message = NO_JSON_REPORT_MESSAGE % structured_report_file_tracker.path
299 error(message)
300 raise Exception(message)
301
302 if not xunit_report_file_tracker.changed():
303 message = REPORT_NOT_CHANGED % xunit_report_file_tracker.path
304 error(message)
305 raise Exception(message)
306
307 if not structured_report_file_tracker.changed():
308 message = REPORT_NOT_CHANGED % structured_report_file_tracker.path
309 error(message)
310 raise Exception(message)
311
312
313 def _xunit_state(kwds, config):
314 # This has been supported in Galaxy for well over a year, just going to assume
315 # it from here on out.
316 # xunit_supported = True
317 # if shell("grep -q xunit '%s'/run_tests.sh" % config.galaxy_root):
318 # xunit_supported = False
319
320 xunit_report_file = kwds.get("test_output_xunit", None)
321 if xunit_report_file is None:
322 xunit_report_file = os.path.join(config.config_directory, "xunit.xml")
323
324 return xunit_report_file
325
326
327 def _structured_report_file(kwds, config):
328 # This has been supported in Galaxy for well over a year, just going to assume
329 # it from here on out.
330 # structured_data_supported = True
331 # if shell("grep -q structured_data '%s'/run_tests.sh" % config.galaxy_root):
332 # structured_data_supported = False
333
334 structured_report_file = None
335 structured_report_file = kwds.get("test_output_json", None)
336 if structured_report_file is None:
337 conf_dir = config.config_directory
338 structured_report_file = os.path.join(conf_dir, "structured_data.json")
339
340 return structured_report_file
341
342
343 class _FileChangeTracker(object):
344
345 def __init__(self, path):
346 modification_time = None
347 if os.path.exists(path):
348 modification_time = os.path.getmtime(path)
349
350 self.path = path
351 self.modification_time = modification_time
352
353 def changed(self):
354 if self.modification_time:
355 new_modification_time = os.path.getmtime(self.path)
356 return self.modification_time != new_modification_time
357 else:
358 return os.path.exists(self.path)
359
360
361 __all__ = (
362 "run_in_config",
363 "handle_reports",
364 "handle_reports_and_summary",
365 )