diff env/lib/python3.7/site-packages/cwltool/checker.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/checker.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,248 @@
+"""Static checking of CWL workflow connectivity."""
+from collections import namedtuple
+from typing import Any, Dict, List, MutableMapping, MutableSequence, Optional
+
+import six
+from schema_salad import validate
+from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from .errors import WorkflowException
+from .loghandler import _logger
+from .process import shortname
+from .utils import json_dumps
+
+
+def _get_type(tp):
+    # type: (Any) -> Any
+    if isinstance(tp, MutableMapping):
+        if tp.get("type") not in ("array", "record", "enum"):
+            return tp["type"]
+    return tp
+
+def check_types(srctype, sinktype, linkMerge, valueFrom):
+    # type: (Any, Any, Optional[Text], Optional[Text]) -> Text
+    """
+    Check if the source and sink types are correct.
+
+    Acceptable types are "pass", "warning", or "exception".
+    """
+    if valueFrom is not None:
+        return "pass"
+    if linkMerge is None:
+        if can_assign_src_to_sink(srctype, sinktype, strict=True):
+            return "pass"
+        if can_assign_src_to_sink(srctype, sinktype, strict=False):
+            return "warning"
+        return "exception"
+    if linkMerge == "merge_nested":
+        return check_types({"items": _get_type(srctype), "type": "array"},
+                           _get_type(sinktype), None, None)
+    if linkMerge == "merge_flattened":
+        return check_types(merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None)
+    raise WorkflowException(u"Unrecognized linkMerge enum '{}'".format(linkMerge))
+
+
+def merge_flatten_type(src):
+    # type: (Any) -> Any
+    """Return the merge flattened type of the source type."""
+    if isinstance(src, MutableSequence):
+        return [merge_flatten_type(t) for t in src]
+    if isinstance(src, MutableMapping) and src.get("type") == "array":
+        return src
+    return {"items": src, "type": "array"}
+
+
+def can_assign_src_to_sink(src, sink, strict=False):  # type: (Any, Any, bool) -> bool
+    """
+    Check for identical type specifications, ignoring extra keys like inputBinding.
+
+    src: admissible source types
+    sink: admissible sink types
+
+    In non-strict comparison, at least one source type must match one sink type.
+    In strict comparison, all source types must match at least one sink type.
+    """
+    if src == "Any" or sink == "Any":
+        return True
+    if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping):
+        if sink.get("not_connected") and strict:
+            return False
+        if src["type"] == "array" and sink["type"] == "array":
+            return can_assign_src_to_sink(src["items"], sink["items"], strict)
+        if src["type"] == "record" and sink["type"] == "record":
+            return _compare_records(src, sink, strict)
+        if src["type"] == "File" and sink["type"] == "File":
+            for sinksf in sink.get("secondaryFiles", []):
+                if not [1 for srcsf in src.get("secondaryFiles", []) if sinksf == srcsf]:
+                    if strict:
+                        return False
+            return True
+        return can_assign_src_to_sink(src["type"], sink["type"], strict)
+    if isinstance(src, MutableSequence):
+        if strict:
+            for this_src in src:
+                if not can_assign_src_to_sink(this_src, sink):
+                    return False
+            return True
+        for this_src in src:
+            if can_assign_src_to_sink(this_src, sink):
+                return True
+        return False
+    if isinstance(sink, MutableSequence):
+        for this_sink in sink:
+            if can_assign_src_to_sink(src, this_sink):
+                return True
+        return False
+    return bool(src == sink)
+
+
+def _compare_records(src, sink, strict=False):
+    # type: (MutableMapping[Text, Any], MutableMapping[Text, Any], bool) -> bool
+    """
+    Compare two records, ensuring they have compatible fields.
+
+    This handles normalizing record names, which will be relative to workflow
+    step, so that they can be compared.
+    """
+    def _rec_fields(rec):  # type: (MutableMapping[Text, Any]) -> MutableMapping[Text, Any]
+        out = {}
+        for field in rec["fields"]:
+            name = shortname(field["name"])
+            out[name] = field["type"]
+        return out
+
+    srcfields = _rec_fields(src)
+    sinkfields = _rec_fields(sink)
+    for key in six.iterkeys(sinkfields):
+        if (not can_assign_src_to_sink(
+                srcfields.get(key, "null"), sinkfields.get(key, "null"), strict)
+                and sinkfields.get(key) is not None):
+            _logger.info("Record comparison failure for %s and %s\n"
+                         "Did not match fields for %s: %s and %s",
+                         src["name"], sink["name"], key, srcfields.get(key),
+                         sinkfields.get(key))
+            return False
+    return True
+
+def missing_subset(fullset, subset): # type: (List[Any], List[Any]) -> List[Any]
+    missing = []
+    for i in subset:
+        if i not in fullset:
+            missing.append(i)
+    return missing
+
+def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step):
+    # type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], Dict[Text, Dict[Text, Any]]) -> None
+    """Check if all source and sink types of a workflow are compatible before run time."""
+    # source parameters: workflow_inputs and step_outputs
+    # sink parameters: step_inputs and workflow_outputs
+
+    # make a dictionary of source parameters, indexed by the "id" field
+    src_parms = workflow_inputs + step_outputs
+    src_dict = {}
+    for parm in src_parms:
+        src_dict[parm["id"]] = parm
+
+    step_inputs_val = check_all_types(src_dict, step_inputs, "source")
+    workflow_outputs_val = check_all_types(src_dict, workflow_outputs, "outputSource")
+
+    warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"]
+    exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"]
+
+    warning_msgs = []
+    exception_msgs = []
+    for warning in warnings:
+        src = warning.src
+        sink = warning.sink
+        linkMerge = warning.linkMerge
+        sinksf = sorted([p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)])
+        srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])])
+        # Every secondaryFile required by the sink, should be declared
+        # by the source
+        missing = missing_subset(srcsf, sinksf)
+        if missing:
+            msg1 = "Parameter '%s' requires secondaryFiles %s but" % (shortname(sink["id"]), missing)
+            msg3 = SourceLine(src, "id").makeError(
+                "source '%s' does not provide those secondaryFiles." % (shortname(src["id"])))
+            msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError("To resolve, add missing secondaryFiles patterns to definition of '%s' or" % (shortname(src["id"])))
+            msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError("mark missing secondaryFiles in definition of '%s' as optional." % shortname(sink["id"]))
+            msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg3, msg4, msg5], "  ")))
+        elif sink.get("not_connected"):
+            msg = SourceLine(sink, "type").makeError(
+                "'%s' is not an input parameter of %s, expected %s"
+                % (shortname(sink["id"]), param_to_step[sink["id"]]["run"],
+                   ", ".join(shortname(s["id"])
+                             for s in param_to_step[sink["id"]]["inputs"]
+                             if not s.get("not_connected"))))
+        else:
+            msg = SourceLine(src, "type").makeError(
+                "Source '%s' of type %s may be incompatible"
+                % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
+                SourceLine(sink, "type").makeError(
+                    "  with sink '%s' of type %s"
+                    % (shortname(sink["id"]), json_dumps(sink["type"])))
+            if linkMerge is not None:
+                msg += "\n" + SourceLine(sink).makeError("  source has linkMerge method %s" % linkMerge)
+
+        warning_msgs.append(msg)
+    for exception in exceptions:
+        src = exception.src
+        sink = exception.sink
+        linkMerge = exception.linkMerge
+        msg = SourceLine(src, "type").makeError(
+            "Source '%s' of type %s is incompatible"
+            % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
+            SourceLine(sink, "type").makeError(
+                "  with sink '%s' of type %s"
+                % (shortname(sink["id"]), json_dumps(sink["type"])))
+        if linkMerge is not None:
+            msg += "\n" + SourceLine(sink).makeError("  source has linkMerge method %s" % linkMerge)
+        exception_msgs.append(msg)
+
+    for sink in step_inputs:
+        if ('null' != sink["type"] and 'null' not in sink["type"]
+                and "source" not in sink and "default" not in sink and "valueFrom" not in sink):
+            msg = SourceLine(sink).makeError(
+                "Required parameter '%s' does not have source, default, or valueFrom expression"
+                % shortname(sink["id"]))
+            exception_msgs.append(msg)
+
+    all_warning_msg = strip_dup_lineno("\n".join(warning_msgs))
+    all_exception_msg = strip_dup_lineno("\n".join(exception_msgs))
+
+    if warnings:
+        _logger.warning("Workflow checker warning:\n%s", all_warning_msg)
+    if exceptions:
+        raise validate.ValidationException(all_exception_msg)
+
+
+SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge"])
+
+def check_all_types(src_dict, sinks, sourceField):
+    # type: (Dict[Text, Any], List[Dict[Text, Any]], Text) -> Dict[Text, List[SrcSink]]
+    """
+    Given a list of sinks, check if their types match with the types of their sources.
+
+    sourceField is either "soure" or "outputSource"
+    """
+    validation = {"warning": [], "exception": []}  # type: Dict[Text, List[SrcSink]]
+    for sink in sinks:
+        if sourceField in sink:
+            valueFrom = sink.get("valueFrom")
+            if isinstance(sink[sourceField], MutableSequence):
+                srcs_of_sink = [src_dict[parm_id] for parm_id in sink[sourceField]]
+                linkMerge = sink.get("linkMerge", ("merge_nested"
+                                                   if len(sink[sourceField]) > 1 else None))
+            else:
+                parm_id = sink[sourceField]
+                srcs_of_sink = [src_dict[parm_id]]
+                linkMerge = None
+            for src in srcs_of_sink:
+                check_result = check_types(src, sink, linkMerge, valueFrom)
+                if check_result == "warning":
+                    validation["warning"].append(SrcSink(src, sink, linkMerge))
+                elif check_result == "exception":
+                    validation["exception"].append(SrcSink(src, sink, linkMerge))
+    return validation