diff env/lib/python3.7/site-packages/gxformat2/lint.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/gxformat2/lint.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,180 @@
+"""Workflow linting entry point - main script."""
+import argparse
+import os
+import sys
+
+from gxformat2._scripts import ensure_format2
+from gxformat2._yaml import ordered_load
+from gxformat2.linting import LintContext
+from gxformat2.markdown_parse import validate_galaxy_markdown
+
+EXIT_CODE_SUCCESS = 0
+EXIT_CODE_LINT_FAILED = 1
+EXIT_CODE_FORMAT_ERROR = 2
+EXIT_CODE_FILE_PARSE_FAILED = 3
+
+LINT_FAILED_NO_OUTPUTS = "Workflow contained no outputs"
+LINT_FAILED_OUTPUT_NO_LABEL = "Workflow contained output without a label"
+
+
+def ensure_key(lint_context, has_keys, key, has_class=None, has_value=None):
+    if key not in has_keys:
+        lint_context.error("expected to find key [{key}] but absent", key=key)
+        return None
+
+    value = has_keys[key]
+    return ensure_key_has_value(lint_context, has_keys, key, value, has_class=has_class, has_value=has_value)
+
+
+def ensure_key_if_present(lint_context, has_keys, key, default=None, has_class=None):
+    if key not in has_keys:
+        return default
+
+    value = has_keys[key]
+    return ensure_key_has_value(lint_context, has_keys, key, value, has_class=has_class, has_value=None)
+
+
+def ensure_key_has_value(lint_context, has_keys, key, value, has_class=None, has_value=None):
+    if has_class is not None and not isinstance(value, has_class):
+        lint_context.error("expected value [{value}] with key [{key}] to be of class {clazz}", key=key, value=value, clazz=has_class)
+    if has_value is not None and value != has_value:
+        lint_context.error("expected value [{value}] with key [{key}] to be {expected_value}", key=key, value=value, expected_value=has_value)
+    return value
+
+
+def _lint_step_errors(lint_context, step):
+    step_errors = step.get("errors")
+    if step_errors is not None:
+        lint_context.warn("tool step contains error indicated during Galaxy export - %s" % step_errors)
+
+
+def lint_ga(lint_context, workflow_dict, path=None):
+    """Lint a native/legacy style Galaxy workflow and populate the corresponding LintContext."""
+    ensure_key(lint_context, workflow_dict, "format-version", has_value="0.1")
+    ensure_key(lint_context, workflow_dict, "a_galaxy_workflow", has_value="true")
+
+    native_steps = ensure_key(lint_context, workflow_dict, "steps", has_class=dict) or {}
+
+    found_outputs = False
+    found_output_without_label = False
+    for order_index_str, step in native_steps.items():
+        if not order_index_str.isdigit():
+            lint_context.error("expected step_key to be integer not [{value}]", value=order_index_str)
+
+        workflow_outputs = ensure_key_if_present(lint_context, step, "workflow_outputs", default=[], has_class=list)
+        for workflow_output in workflow_outputs:
+            found_outputs = True
+
+            if not workflow_output.get("label"):
+                found_output_without_label = True
+
+        step_type = step.get("type")
+        if step_type == "subworkflow":
+            subworkflow = ensure_key(lint_context, step, "subworkflow", has_class=dict)
+            lint_ga(lint_context, subworkflow)
+
+        _lint_step_errors(lint_context, step)
+        _lint_tool_if_present(lint_context, step)
+
+    _validate_report(lint_context, workflow_dict)
+    if not found_outputs:
+        lint_context.warn(LINT_FAILED_NO_OUTPUTS)
+
+    if found_output_without_label:
+        lint_context.warn(LINT_FAILED_OUTPUT_NO_LABEL)
+
+    _lint_training(lint_context, workflow_dict)
+
+
+def lint_format2(lint_context, workflow_dict, path=None):
+    """Lint a Format 2 Galaxy workflow and populate the corresponding LintContext."""
+    from gxformat2.schema.v19_09 import load_document
+    from schema_salad.exceptions import SchemaSaladException
+    try:
+        load_document("file://" + os.path.normpath(path))
+    except SchemaSaladException as e:
+        lint_context.error("Validation failed " + str(e))
+
+    steps = ensure_key_if_present(lint_context, workflow_dict, 'steps', default={}, has_class=dict)
+    for key, step in steps.items():
+        _lint_step_errors(lint_context, step)
+        _lint_tool_if_present(lint_context, step)
+
+    _validate_report(lint_context, workflow_dict)
+    _lint_training(lint_context, workflow_dict)
+
+
+def _lint_tool_if_present(lint_context, step_dict):
+    tool_id = step_dict.get('tool_id')
+    if tool_id and 'testtoolshed' in tool_id:
+        lint_context.warn('Step references a tool from the test tool shed, this should be replaced with a production tool')
+
+
+def _validate_report(lint_context, workflow_dict):
+    report_dict = ensure_key_if_present(lint_context, workflow_dict, "report", default=None, has_class=dict)
+    if report_dict is not None:
+        markdown = ensure_key(lint_context, report_dict, "markdown", has_class=str)
+        if isinstance(markdown, str):
+            try:
+                validate_galaxy_markdown(markdown)
+            except ValueError as e:
+                lint_context.error("Report markdown validation failed [%s]" % e)
+
+
+def _lint_training(lint_context, workflow_dict):
+    if lint_context.training_topic is None:
+        return
+
+    if "tags" not in workflow_dict:
+        lint_context.warn("Missing tag(s).")
+    else:
+        tags = workflow_dict["tags"]
+        if lint_context.training_topic not in tags:
+            lint_context.warn("Missing expected training topic (%s) as workflow tag." % lint_context.training_topic)
+    # Move up into individual lints - all workflows should have docs.
+    format2_dict = ensure_format2(workflow_dict)
+    if "doc" not in format2_dict:
+        lint_context.warn("Missing workflow documentation (annotation or doc element)")
+    elif not format2_dict["doc"]:
+        lint_context.warn("Empty workflow documentation (annotation or doc element)")
+
+
+def main(argv=None):
+    """Script entry point for linting workflows."""
+    if argv is None:
+        argv = sys.argv
+    args = _parser().parse_args(argv[1:])
+    path = args.path
+    with open(path, "r") as f:
+        try:
+            workflow_dict = ordered_load(f)
+        except Exception:
+            return EXIT_CODE_FILE_PARSE_FAILED
+    workflow_class = workflow_dict.get("class")
+    lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga
+    lint_context = LintContext(training_topic=args.training_topic)
+    lint_func(lint_context, workflow_dict, path=path)
+    lint_context.print_messages()
+    if lint_context.found_errors:
+        return EXIT_CODE_FORMAT_ERROR
+    elif lint_context.found_warns:
+        return EXIT_CODE_LINT_FAILED
+    else:
+        return EXIT_CODE_SUCCESS
+
+
+def _parser():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--training-topic",
+                        required=False,
+                        help='If this is a training workflow, specify a training topic.')
+    parser.add_argument('path', metavar='PATH', type=str,
+                        help='workflow path')
+    return parser
+
+
+if __name__ == "__main__":
+    sys.exit(main())
+
+
+__all__ = ('main', 'lint_format2', 'lint_ga')