Mercurial > repos > galaxyp > openms_cometadapter
comparison get_tests.py @ 9:df0c77da6c87 draft default tip
planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/openms commit 5c080b1e2b99f1c88f4557e9fec8c45c9d23b906
author | galaxyp |
---|---|
date | Fri, 14 Jun 2024 21:35:36 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
8:e75929491905 | 9:df0c77da6c87 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import os.path | |
5 import re | |
6 import shlex | |
7 import sys | |
8 import tempfile | |
9 from typing import ( | |
10 Dict, | |
11 List, | |
12 Optional, | |
13 TextIO, | |
14 Tuple, | |
15 ) | |
16 | |
17 from ctdconverter.common.utils import ( | |
18 ParameterHardcoder, | |
19 parse_hardcoded_parameters, | |
20 parse_input_ctds, | |
21 ) | |
22 from ctdconverter.galaxy.converter import convert_models | |
23 from CTDopts.CTDopts import ( | |
24 CTDModel, | |
25 ModelTypeError, | |
26 Parameters, | |
27 ) | |
28 | |
29 SKIP_LIST = [ | |
30 r"_prepare\"", | |
31 r"_convert", | |
32 r"WRITEINI", | |
33 r"WRITECTD", | |
34 r"INVALIDVALUE", | |
35 r"\.ini\.json", | |
36 r"OpenSwathMzMLFileCacher .*-convert_back", # - OpenSwathMzMLFileCacher with -convert_back argument https://github.com/OpenMS/OpenMS/issues/4399 | |
37 r"MaRaClusterAdapter.*-consensus_out", # - MaRaCluster with -consensus_out (parameter blacklister: https://github.com/OpenMS/OpenMS/issues/4456) | |
38 r"FileMerger_1_input1.dta2d.*FileMerger_1_input2.dta ", # - FileMerger with mixed dta dta2d input (ftype can not be specified in the test, dta can not be sniffed) | |
39 r'^(TOPP_OpenSwathAnalyzer_test_3|TOPP_OpenSwathAnalyzer_test_4)$', # no suppert for cached mzML | |
40 r'TOPP_SiriusAdapter_[0-9]+$', # Do not test SiriusAdapter https://github.com/OpenMS/OpenMS/issues/7000 .. will be removed anyway | |
41 r'TOPP_AssayGeneratorMetabo_(7|8|9|10|11|12|13|14|15|16|17|18)$' # Skip AssayGeneratorMetabo tests using Sirius https://github.com/OpenMS/OpenMS/issues/7150 (will be replaced by two tools) | |
42 ] | |
43 | |
44 | |
45 def get_failing_tests(cmake: List[str]) -> List[str]: | |
46 failing_tests = [] | |
47 re_fail = re.compile(r"set_tests_properties\(\"([^\"]+)\" PROPERTIES WILL_FAIL 1\)") | |
48 | |
49 for cmake in args.cmake: | |
50 with open(cmake) as cmake_fh: | |
51 for line in cmake_fh: | |
52 match = re_fail.search(line) | |
53 if match: | |
54 failing_tests.append(match.group(1)) | |
55 return failing_tests | |
56 | |
57 | |
58 def fix_tmp_files(line: str, diff_pairs: Dict[str, str]) -> str: | |
59 """ | |
60 OpenMS tests output to tmp files and compare with FuzzyDiff to the expected file. | |
61 problem: the extension of the tmp files is unusable for test generation. | |
62 unfortunately the extensions used in the DIFF lines are not always usable for the CLI | |
63 (e.g. for prepare_test_data, e.g. CLI expects csv but test file is txt) | |
64 this function replaces the tmp file by the expected file. | |
65 """ | |
66 cmd = shlex.split(line) | |
67 for i, e in enumerate(cmd): | |
68 if e in diff_pairs: | |
69 dst = os.path.join("test-data", diff_pairs[e]) | |
70 if os.path.exists(dst): | |
71 os.unlink(dst) | |
72 sys.stderr.write(f"symlink {e} {dst}\n") | |
73 os.symlink(e, dst) | |
74 cmd[i] = diff_pairs[e] | |
75 return shlex.join(cmd) | |
76 | |
77 | |
78 def get_ini(line: str, tool_id: str) -> Tuple[str, str]: | |
79 """ | |
80 if there is an ini file then we use this to generate the test | |
81 otherwise the ctd file is used | |
82 other command line parameters are inserted later into this xml | |
83 """ | |
84 cmd = shlex.split(line) | |
85 ini = None | |
86 for i, e in enumerate(cmd): | |
87 if e == "-ini": | |
88 ini = cmd[i + 1] | |
89 cmd = cmd[:i] + cmd[i + 2:] | |
90 if ini: | |
91 return os.path.join("test-data", ini), shlex.join(cmd) | |
92 else: | |
93 return os.path.join("ctd", f"{tool_id}.ctd"), line | |
94 | |
95 | |
96 def unique_files(line: str): | |
97 """ | |
98 some tests use the same file twice which does not work in planemo tests | |
99 hence we create symlinks for each file used twice | |
100 """ | |
101 cmd = shlex.split(line) | |
102 # print(f"{cmd}") | |
103 files = {} | |
104 # determine the list of indexes where each file argument (anything appearing in test-data/) appears | |
105 for idx, e in enumerate(cmd): | |
106 p = os.path.join("test-data", e) | |
107 if not os.path.exists(p) and not os.path.islink(p): | |
108 continue | |
109 try: | |
110 files[e].append(idx) | |
111 except KeyError: | |
112 files[e] = [idx] | |
113 # print(f"{files=}") | |
114 for f in files: | |
115 if len(files[f]) < 2: | |
116 continue | |
117 for i, idx in enumerate(files[f]): | |
118 f_parts = f.split(".") | |
119 f_parts[0] = f"{f_parts[0]}_{i}" | |
120 new_f = ".".join(f_parts) | |
121 # if os.path.exists(os.path.join("test-data", new_f)): | |
122 # os.unlink(os.path.join("test-data", new_f)) | |
123 sys.stderr.write( | |
124 f'\tsymlink {os.path.join("test-data", new_f)} {f}\n' | |
125 ) | |
126 try: | |
127 os.symlink(f, os.path.join("test-data", new_f)) | |
128 except FileExistsError: | |
129 pass | |
130 cmd[idx] = new_f | |
131 return shlex.join(cmd) | |
132 | |
133 | |
134 def fill_ctd_clargs(ini: str, line: str, ctd_tmp: TextIO) -> None: | |
135 cmd = shlex.split(line) | |
136 | |
137 # load CTDModel | |
138 ini_model = None | |
139 try: | |
140 ini_model = CTDModel(from_file=ini) | |
141 except ModelTypeError: | |
142 pass | |
143 try: | |
144 ini_model = Parameters(from_file=ini) | |
145 except ModelTypeError: | |
146 pass | |
147 assert ini_model is not None, "Could not parse %s, seems to be no CTD/PARAMS" % ( | |
148 args.ini_file | |
149 ) | |
150 | |
151 # get a dictionary of the ctd arguments where the values of the parameters | |
152 # given on the command line are overwritten | |
153 ini_values = ini_model.parse_cl_args(cl_args=cmd, ignore_required=True) | |
154 ini_model.write_ctd(ctd_tmp, ini_values) | |
155 | |
156 | |
157 def process_test_line( | |
158 id: str, | |
159 line: str, | |
160 failing_tests: List[str], | |
161 skip_list: List[str], | |
162 diff_pairs: Dict[str, str], | |
163 ) -> Optional[str]: | |
164 | |
165 re_test_id = re.compile(r"add_test\(\"([^\"]+)\" ([^ ]+) (.*)") | |
166 re_id_out_test = re.compile(r"_out_?[0-9]?") | |
167 | |
168 # TODO auto extract from set(OLD_OSW_PARAM ... lin | |
169 line = line.replace( | |
170 "${OLD_OSW_PARAM}", | |
171 " -test -mz_extraction_window 0.05 -mz_extraction_window_unit Th -ms1_isotopes 0 -Scoring:TransitionGroupPicker:compute_peak_quality -Scoring:Scores:use_ms1_mi false -Scoring:Scores:use_mi_score false", | |
172 ) | |
173 | |
174 line = line.replace("${TOPP_BIN_PATH}/", "") | |
175 line = line.replace("${DATA_DIR_TOPP}/", "") | |
176 line = line.replace("THIRDPARTY/", "") | |
177 line = line.replace("${DATA_DIR_SHARE}/", "") | |
178 # IDRipper PATH gets empty causing problems. TODO But overall the option needs to be handled differently | |
179 line = line.replace("${TMP_RIP_PATH}/", "") | |
180 # some input files are originally in a subdir (degenerated cases/), but not in test-data | |
181 line = line.replace("degenerate_cases/", "") | |
182 # determine the test and tool ids and remove the 1) add_test("TESTID" 2) trailing ) | |
183 match = re_test_id.match(line) | |
184 if not match: | |
185 sys.exit(f"Ill formated test line {line}\n") | |
186 test_id = match.group(1) | |
187 tool_id = match.group(2) | |
188 | |
189 line = f"{match.group(2)} {match.group(3)}" | |
190 | |
191 if test_id in failing_tests: | |
192 sys.stderr.write(f" skip failing {test_id} {line}\n") | |
193 return | |
194 | |
195 if id != tool_id: | |
196 sys.stderr.write(f" skip {test_id} ({id} != {tool_id}) {line}\n") | |
197 return | |
198 | |
199 if re_id_out_test.search(test_id): | |
200 sys.stderr.write(f" skip {test_id} {line}\n") | |
201 return | |
202 | |
203 for skip in skip_list: | |
204 if re.search(skip, line): | |
205 return | |
206 if re.search(skip, test_id): | |
207 return | |
208 | |
209 line = fix_tmp_files(line, diff_pairs) | |
210 # print(f"fix {line=}") | |
211 line = unique_files(line) | |
212 # print(f"unq {line=}") | |
213 ini, line = get_ini(line, tool_id) | |
214 | |
215 from dataclasses import dataclass, field | |
216 | |
217 @dataclass | |
218 class CTDConverterArgs: | |
219 input_files: list | |
220 output_destination: str | |
221 default_executable_path: Optional[str] = None | |
222 hardcoded_parameters: Optional[str] = None | |
223 parameter_hardcoder: Optional[ParameterHardcoder] = None | |
224 xsd_location: Optional[str] = None | |
225 formats_file: Optional[str] = None | |
226 add_to_command_line: str = "" | |
227 required_tools_file: Optional[str] = None | |
228 skip_tools_file: Optional[str] = None | |
229 macros_files: Optional[List[str]] = field(default_factory=list) | |
230 test_macros_files: Optional[List[str]] = field(default_factory=list) | |
231 test_macros_prefix: Optional[List[str]] = field(default_factory=list) | |
232 test_test: bool = False | |
233 test_only: bool = False | |
234 test_unsniffable: Optional[List[str]] = field(default_factory=list) | |
235 test_condition: Optional[List[str]] = ("compare=sim_size", "delta_frac=0.05") | |
236 tool_version: str = None | |
237 tool_profile: str = None | |
238 bump_file: str = None | |
239 | |
240 # create an ini/ctd file where the values are equal to the arguments from the command line | |
241 # and transform it to xml | |
242 test = [f"<!-- {test_id} -->\n"] | |
243 with tempfile.NamedTemporaryFile( | |
244 mode="w+", delete_on_close=False | |
245 ) as ctd_tmp, tempfile.NamedTemporaryFile( | |
246 mode="w+", delete_on_close=False | |
247 ) as xml_tmp: | |
248 fill_ctd_clargs(ini, line, ctd_tmp) | |
249 ctd_tmp.close() | |
250 xml_tmp.close() | |
251 parsed_ctd = parse_input_ctds(None, [ctd_tmp.name], xml_tmp.name, "xml") | |
252 ctd_args = CTDConverterArgs( | |
253 input_files=[ctd_tmp.name], | |
254 output_destination=xml_tmp.name, | |
255 macros_files=["macros.xml"], | |
256 skip_tools_file="aux/tools_blacklist.txt", | |
257 formats_file="aux/filetypes.txt", | |
258 # tool_conf_destination = "tool.conf", | |
259 hardcoded_parameters="aux/hardcoded_params.json", | |
260 tool_version="3.1", | |
261 test_only=True, | |
262 test_unsniffable=[ | |
263 "csv", | |
264 "tsv", | |
265 "txt", | |
266 "dta", | |
267 "dta2d", | |
268 "edta", | |
269 "mrm", | |
270 "splib", | |
271 ], | |
272 test_condition=["compare=sim_size", "delta_frac=0.7"], | |
273 ) | |
274 ctd_args.parameter_hardcoder = parse_hardcoded_parameters( | |
275 ctd_args.hardcoded_parameters | |
276 ) | |
277 convert_models(ctd_args, parsed_ctd) | |
278 xml_tmp = open(xml_tmp.name, "r") | |
279 for l in xml_tmp: | |
280 test.append(l) | |
281 | |
282 return "".join(test) | |
283 | |
284 | |
285 parser = argparse.ArgumentParser(description="Create Galaxy tests for a OpenMS tools") | |
286 parser.add_argument("--id", dest="id", help="tool id") | |
287 parser.add_argument("--cmake", nargs="+", help="OpenMS test CMake files") | |
288 args = parser.parse_args() | |
289 sys.stderr.write(f"generate tests for {args.id}\n") | |
290 | |
291 re_comment = re.compile("#.*") | |
292 re_empty_prefix = re.compile(r"^\s*") | |
293 re_empty_suffix = re.compile(r"\s*$") | |
294 re_add_test = re.compile(r"add_test\(\"(TOPP|UTILS)_.*/" + args.id) | |
295 re_diff = re.compile(r"\$\{DIFF\}.* -in1 ([^ ]+) -in2 ([^ ]+)") | |
296 failing_tests = get_failing_tests(args.cmake) | |
297 tests = [] | |
298 | |
299 # process the given CMake files and compile lists of | |
300 # - test lines .. essentially add_test(...) | |
301 # - and pairs of files that are diffed | |
302 jline = "" | |
303 test_lines = [] | |
304 diff_pairs = {} | |
305 for cmake in args.cmake: | |
306 with open(cmake) as cmake_fh: | |
307 for line in cmake_fh: | |
308 # remove comments, empty prefixes and suffixes | |
309 line = re_comment.sub("", line) | |
310 line = re_empty_prefix.sub("", line) | |
311 line = re_empty_suffix.sub("", line) | |
312 # skip empty lines | |
313 if line == "": | |
314 continue | |
315 | |
316 # join test statements that are split over multiple lines | |
317 if line.endswith(")"): | |
318 jline += " " + line[:-1] | |
319 else: | |
320 jline = line | |
321 continue | |
322 line, jline = jline.strip(), "" | |
323 match = re_diff.search(line) | |
324 if match: | |
325 in1 = match.group(1).split("/")[-1] | |
326 in2 = match.group(2).split("/")[-1] | |
327 if in1 != in2: | |
328 diff_pairs[in1] = in2 | |
329 elif re_add_test.match(line): | |
330 test_lines.append(line) | |
331 | |
332 for line in test_lines: | |
333 test = process_test_line(args.id, line, failing_tests, SKIP_LIST, diff_pairs) | |
334 if test: | |
335 tests.append(test) | |
336 | |
337 tests = "\n".join(tests) | |
338 print( | |
339 f""" | |
340 <xml name="autotest_{args.id}"> | |
341 {tests} | |
342 </xml> | |
343 """ | |
344 ) |