comparison env/lib/python3.7/site-packages/planemo/runnable.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Describe artifacts that can be run, tested, and linted."""
2
3 from __future__ import absolute_import
4
5 import abc
6 import collections
7 import os
8 from distutils.dir_util import copy_tree
9
10 import aenum
11 import six
12 import yaml
13 from galaxy.tool_util.cwl.parser import workflow_proxy
14 from galaxy.tool_util.loader_directory import (
15 is_a_yaml_with_class,
16 looks_like_a_cwl_artifact,
17 looks_like_a_data_manager_xml,
18 looks_like_a_tool_cwl,
19 looks_like_a_tool_xml,
20 )
21 from galaxy.tool_util.parser import get_tool_source
22
23 from planemo.exit_codes import EXIT_CODE_UNKNOWN_FILE_TYPE, ExitCodeException
24 from planemo.galaxy.workflows import describe_outputs
25 from planemo.io import error
26 from planemo.test import check_output
27
28 TEST_SUFFIXES = [
29 "-tests", "_tests", "-test", "_test"
30 ]
31 TEST_EXTENSIONS = [".yml", ".yaml", ".json"]
32
33 TEST_FILE_NOT_LIST_MESSAGE = ("Invalid test definition file [%s] - file must "
34 "contain a list of tests")
35 TEST_FIELD_MISSING_MESSAGE = ("Invalid test definition [test #%d in %s] -"
36 "defintion must field [%s].")
37
38
39 RunnableType = aenum.Enum(
40 "RunnableType", 'galaxy_tool galaxy_datamanager galaxy_workflow cwl_tool cwl_workflow directory'
41 )
42
43
44 @property
45 def _runnable_type_has_tools(runnable_type):
46 return runnable_type.name in ["galaxy_tool", "galaxy_datamanager", "cwl_tool", "directory"]
47
48
49 @property
50 def _runnable_type_is_single_artifact(runnable_type):
51 return runnable_type.name not in ["directory"]
52
53
54 @property
55 def _runnable_type_test_data_in_parent_dir(runnable_type):
56 return runnable_type.name in ["galaxy_datamanager"]
57
58
59 RunnableType.has_tools = _runnable_type_has_tools
60 RunnableType.is_single_artifact = _runnable_type_is_single_artifact
61 RunnableType.test_data_in_parent_dir = _runnable_type_test_data_in_parent_dir
62
63 _Runnable = collections.namedtuple("Runnable", ["path", "type"])
64
65
66 class Runnable(_Runnable):
67
68 @property
69 def test_data_search_path(self):
70 if self.type.name in ['galaxy_datamanager']:
71 return os.path.join(os.path.dirname(self.path), os.path.pardir)
72 else:
73 return self.path
74
75 @property
76 def tool_data_search_path(self):
77 return self.test_data_search_path
78
79 @property
80 def data_manager_conf_path(self):
81 if self.type.name in ['galaxy_datamanager']:
82 return os.path.join(os.path.dirname(self.path), os.pardir, 'data_manager_conf.xml')
83
84 @property
85 def has_tools(self):
86 return _runnable_delegate_attribute('has_tools')
87
88 @property
89 def is_single_artifact(self):
90 return _runnable_delegate_attribute('is_single_artifact')
91
92
93 def _runnable_delegate_attribute(attribute):
94
95 @property
96 def getter(runnable):
97 return getattr(runnable.type, attribute)
98
99 return getter
100
101
102 def _copy_runnable_tree(path, runnable_type, temp_path):
103 dir_to_copy = None
104 if runnable_type in {RunnableType.galaxy_tool, RunnableType.cwl_tool}:
105 dir_to_copy = os.path.dirname(path)
106 path = os.path.join(temp_path, os.path.basename(path))
107 elif runnable_type == RunnableType.directory:
108 dir_to_copy = path
109 path = temp_path
110 elif runnable_type == RunnableType.galaxy_datamanager:
111 dir_to_copy = os.path.join(os.path.dirname(path), os.pardir)
112 path_to_data_manager_tool = os.path.relpath(path, dir_to_copy)
113 path = os.path.join(temp_path, path_to_data_manager_tool)
114 if dir_to_copy:
115 copy_tree(dir_to_copy, temp_path)
116 return path
117
118
119 def for_path(path, temp_path=None):
120 """Produce a class:`Runnable` for supplied path."""
121 runnable_type = None
122 if os.path.isdir(path):
123 runnable_type = RunnableType.directory
124 elif looks_like_a_tool_cwl(path):
125 runnable_type = RunnableType.cwl_tool
126 elif looks_like_a_data_manager_xml(path):
127 runnable_type = RunnableType.galaxy_datamanager
128 elif looks_like_a_tool_xml(path):
129 runnable_type = RunnableType.galaxy_tool
130 elif is_a_yaml_with_class(path, ["GalaxyWorkflow"]):
131 runnable_type = RunnableType.galaxy_workflow
132 elif path.endswith(".ga"):
133 runnable_type = RunnableType.galaxy_workflow
134 elif looks_like_a_cwl_artifact(path, ["Workflow"]):
135 runnable_type = RunnableType.cwl_workflow
136
137 if runnable_type is None:
138 error("Unable to determine runnable type for path [%s]" % path)
139 raise ExitCodeException(EXIT_CODE_UNKNOWN_FILE_TYPE)
140
141 if temp_path:
142 path = _copy_runnable_tree(path, runnable_type, temp_path)
143
144 return Runnable(path, runnable_type)
145
146
147 def for_paths(paths, temp_path=None):
148 """Return a specialized list of Runnable objects for paths."""
149 return [for_path(path, temp_path=temp_path) for path in paths]
150
151
152 def cases(runnable):
153 """Build a `list` of :class:`TestCase` objects for specified runnable."""
154 cases = []
155
156 tests_path = _tests_path(runnable)
157 if tests_path is None:
158 if runnable.type == RunnableType.galaxy_tool:
159 tool_source = get_tool_source(runnable.path)
160 test_dicts = tool_source.parse_tests_to_dict()
161 tool_id = tool_source.parse_id()
162 tool_version = tool_source.parse_version()
163 for i, test_dict in enumerate(test_dicts.get("tests", [])):
164 cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict))
165 return cases
166
167 tests_directory = os.path.abspath(os.path.dirname(tests_path))
168
169 def normalize_to_tests_path(path):
170 if not os.path.isabs(path):
171 absolute_path = os.path.join(tests_directory, path)
172 else:
173 absolute_path = path
174 return os.path.normpath(absolute_path)
175
176 with open(tests_path, "r") as f:
177 tests_def = yaml.safe_load(f)
178
179 if not isinstance(tests_def, list):
180 message = TEST_FILE_NOT_LIST_MESSAGE % tests_path
181 raise Exception(message)
182
183 for i, test_def in enumerate(tests_def):
184 if "job" not in test_def:
185 message = TEST_FIELD_MISSING_MESSAGE % (
186 i + 1, tests_path, "job"
187 )
188 raise Exception(message)
189 job_def = test_def["job"]
190 if isinstance(job_def, dict):
191 job_path = None
192 job = job_def
193 else:
194 job_path = normalize_to_tests_path(job_def)
195 job = None
196
197 doc = test_def.get("doc", None)
198 output_expectations = test_def.get("outputs", {})
199 case = TestCase(
200 runnable=runnable,
201 tests_directory=tests_directory,
202 output_expectations=output_expectations,
203 index=i,
204 job_path=job_path,
205 job=job,
206 doc=doc,
207 )
208 cases.append(case)
209
210 return cases
211
212
213 class AbstractTestCase(object):
214 """Description of a test case for a runnable.
215 """
216
217 __metaclass__ = abc.ABCMeta
218
219 def structured_test_data(self, run_response):
220 """Result of executing this test case - a "structured_data" dict.
221
222 :rtype: dict
223 :return:
224 For example::
225
226 {
227 "id": "",
228 "has_data": true,
229 "data": {
230 "status": "success", // error, skip,
231 "job": {
232 "command_line": "cat moo",
233 "stdout": "",
234 "stderr": ""
235 },
236 "output_problems": [],
237 "execution_problem": "",
238 "inputs" = {},
239 "problem_log": ""
240 }
241 }
242 """
243
244
245 class TestCase(AbstractTestCase):
246 """Describe an abstract test case for a specified runnable."""
247
248 def __init__(self, runnable, tests_directory, output_expectations, job_path, job, index, doc):
249 """Construct TestCase object from required attributes."""
250 self.runnable = runnable
251 self.job_path = job_path
252 self.job = job
253 self.output_expectations = output_expectations
254 self.tests_directory = tests_directory
255 self.index = index
256 self.doc = doc
257
258 def __repr__(self):
259 return 'TestCase (%s) for runnable (%s) with job (%s) and expected outputs (%s) in directory (%s) with id (%s)' % \
260 (self.doc, self.runnable, self.job, self.output_expectations, self.tests_directory, self.index)
261
262 def structured_test_data(self, run_response):
263 """Check a test case against outputs dictionary.
264 """
265 output_problems = []
266 if run_response.was_successful:
267 outputs_dict = run_response.outputs_dict
268 execution_problem = None
269 for output_id, output_test in self.output_expectations.items():
270 if output_id not in outputs_dict:
271 message = "Expected output [%s] not found in results." % output_id
272 output_problems.append(message)
273 continue
274
275 output_value = outputs_dict[output_id]
276 output_problems.extend(
277 self._check_output(output_id, output_value, output_test)
278 )
279 if output_problems:
280 status = "failure"
281 else:
282 status = "success"
283 else:
284 execution_problem = run_response.error_message
285 status = "error"
286 data_dict = dict(
287 status=status
288 )
289 if status != "success":
290 data_dict["output_problems"] = output_problems
291 data_dict["execution_problem"] = execution_problem
292 log = run_response.log
293 if log is not None:
294 data_dict["problem_log"] = log
295 job_info = run_response.job_info
296 if job_info is not None:
297 data_dict["job"] = job_info
298 data_dict["inputs"] = self._job
299 return dict(
300 id=("%s_%s" % (self._test_id, self.index)),
301 has_data=True,
302 data=data_dict,
303 )
304
305 @property
306 def _job(self):
307 if self.job_path is not None:
308 with open(self.job_path, "r") as f:
309 return f.read()
310 else:
311 return self.job
312
313 def _check_output(self, output_id, output_value, output_test):
314 output_problems = []
315 if not isinstance(output_test, dict):
316 if output_test != output_value:
317 template = "Output [%s] value [%s] does not match expected value [%s]."
318 message = template % (output_id, output_value, output_test)
319 output_problems.append(message)
320 else:
321 if not isinstance(output_value, dict):
322 output_problems.append("Expected file properties for output [%s]" % output_id)
323 return
324 if "path" not in output_value and "location" in output_value:
325 assert output_value["location"].startswith("file://")
326 output_value["path"] = output_value["location"][len("file://"):]
327 if "path" not in output_value:
328 output_problems.append("No path specified for expected output file [%s]" % output_id)
329 return
330
331 output_problems.extend(
332 check_output(
333 self.runnable,
334 output_value,
335 output_test,
336 # TODO: needs kwds in here...
337 )
338 )
339
340 return output_problems
341
342 @property
343 def _test_id(self):
344 if self.runnable.type in [
345 RunnableType.cwl_tool,
346 RunnableType.galaxy_tool,
347 ]:
348 return get_tool_source(self.runnable.path).parse_id()
349 else:
350 return os.path.basename(self.runnable.path)
351
352
353 class ExternalGalaxyToolTestCase(AbstractTestCase):
354 """Special class of AbstractCase that doesn't use job_path but uses test data from a Galaxy server.
355 """
356
357 def __init__(self, runnable, tool_id, tool_version, test_index, test_dict):
358 """Construct TestCase object from required attributes."""
359 self.runnable = runnable
360 self.tool_id = tool_id
361 self.tool_version = tool_version
362 self.test_index = test_index
363 self.test_dict = test_dict
364
365 def structured_test_data(self, run_response):
366 """Just return the structured_test_data generated from galaxy-tool-util for this test variant.
367 """
368 return run_response
369
370
371 def _tests_path(runnable):
372 if not runnable.is_single_artifact:
373 raise NotImplementedError("Tests for directories are not yet implemented.")
374
375 runnable_path = runnable.path
376 base, _ = os.path.splitext(runnable_path)
377
378 for test_suffix in TEST_SUFFIXES:
379 for test_extension in TEST_EXTENSIONS:
380 test_path = base + test_suffix + test_extension
381 if os.path.exists(test_path):
382 return test_path
383
384 return None
385
386
387 def get_outputs(runnable):
388 """Return a list of :class:`RunnableOutput` objects for this runnable."""
389 if not runnable.is_single_artifact:
390 raise NotImplementedError("Cannot generate outputs for a directory.")
391 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
392 tool_source = get_tool_source(runnable.path)
393 # TODO: do something with collections at some point
394 output_datasets, _ = tool_source.parse_outputs(None)
395 outputs = [ToolOutput(o) for o in output_datasets.values()]
396 return outputs
397 elif runnable.type == RunnableType.galaxy_workflow:
398 workflow_outputs = describe_outputs(runnable.path)
399 return [GalaxyWorkflowOutput(o) for o in workflow_outputs]
400 elif runnable.type == RunnableType.cwl_workflow:
401 workflow = workflow_proxy(runnable.path, strict_cwl_validation=False)
402 return [CwlWorkflowOutput(label) for label in workflow.output_labels]
403 else:
404 raise NotImplementedError("Getting outputs for this artifact type is not yet supported.")
405
406
407 class RunnableOutput(object):
408 """Description of a single output of an execution of a Runnable."""
409
410 __metaclass__ = abc.ABCMeta
411
412 @abc.abstractproperty
413 def get_id(self):
414 """An identifier that describes this output."""
415
416
417 class ToolOutput(RunnableOutput):
418
419 def __init__(self, tool_output):
420 self._tool_output = tool_output
421
422 def get_id(self):
423 return self._tool_output.name
424
425
426 class GalaxyWorkflowOutput(RunnableOutput):
427
428 def __init__(self, workflow_output):
429 self._workflow_output = workflow_output
430
431 def get_id(self):
432 return self._workflow_output.label
433
434 @property
435 def workflow_output(self):
436 return self._workflow_output
437
438
439 class CwlWorkflowOutput(RunnableOutput):
440
441 def __init__(self, label):
442 self._label = label
443
444 def get_id(self):
445 return self._label
446
447
448 class RunResponse(object):
449 """Description of an attempt for an engine to execute a Runnable."""
450
451 __metaclass__ = abc.ABCMeta
452
453 @abc.abstractproperty
454 def was_successful(self):
455 """Indicate whether an error was encountered while executing this runnble.
456
457 If successful, response should conform to the SuccessfulRunResponse interface,
458 otherwise it will conform to the ErrorRunResponse interface.
459 """
460
461 @abc.abstractproperty
462 def job_info(self):
463 """If job information is available, return as dictionary."""
464
465 @abc.abstractproperty
466 def log(self):
467 """If engine related log is available, return as text data."""
468
469
470 class SuccessfulRunResponse(RunResponse):
471 """Description of the results of an engine executing a Runnable."""
472
473 __metaclass__ = abc.ABCMeta
474
475 def was_successful(self):
476 """Return `True` to indicate this run was successful."""
477 return True
478
479 @abc.abstractproperty
480 def outputs_dict(self):
481 """Return a dict of output descriptions."""
482
483
484 @six.python_2_unicode_compatible
485 class ErrorRunResponse(RunResponse):
486 """Description of an error while attempting to execute a Runnable."""
487
488 def __init__(self, error_message, job_info=None, log=None):
489 """Create an ErrorRunResponse with specified error message."""
490 self._error_message = error_message
491 self._job_info = job_info
492 self._log = log
493
494 @property
495 def error_message(self):
496 """Error message describing the problem with execution of the runnable."""
497 return self._error_message
498
499 @property
500 def was_successful(self):
501 """Return `False` to indicate this run was successful."""
502 return False
503
504 @property
505 def job_info(self):
506 """Return potentially null stored `job_info` dict."""
507 return self._job_info
508
509 @property
510 def log(self):
511 """Return potentially null stored `log` text."""
512 return self._log
513
514 def __str__(self):
515 """Print a helpful error description of run."""
516 message = "Run failed with message [%s]" % self.error_message
517 log = self.log
518 if log:
519 message += " and log [%s]" % log
520 return message
521
522
523 __all__ = (
524 "cases",
525 "ErrorRunResponse",
526 "for_path",
527 "for_paths",
528 "get_outputs",
529 "Runnable",
530 "RunnableType",
531 "RunResponse",
532 "RunnableOutput",
533 "SuccessfulRunResponse",
534 "TestCase",
535 )