diff env/lib/python3.7/site-packages/cwltool/checker.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/checker.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,248 +0,0 @@
-"""Static checking of CWL workflow connectivity."""
-from collections import namedtuple
-from typing import Any, Dict, List, MutableMapping, MutableSequence, Optional
-
-import six
-from schema_salad import validate
-from schema_salad.sourceline import SourceLine, bullets, strip_dup_lineno
-from typing_extensions import Text  # pylint: disable=unused-import
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from .errors import WorkflowException
-from .loghandler import _logger
-from .process import shortname
-from .utils import json_dumps
-
-
-def _get_type(tp):
-    # type: (Any) -> Any
-    if isinstance(tp, MutableMapping):
-        if tp.get("type") not in ("array", "record", "enum"):
-            return tp["type"]
-    return tp
-
-def check_types(srctype, sinktype, linkMerge, valueFrom):
-    # type: (Any, Any, Optional[Text], Optional[Text]) -> Text
-    """
-    Check if the source and sink types are correct.
-
-    Acceptable types are "pass", "warning", or "exception".
-    """
-    if valueFrom is not None:
-        return "pass"
-    if linkMerge is None:
-        if can_assign_src_to_sink(srctype, sinktype, strict=True):
-            return "pass"
-        if can_assign_src_to_sink(srctype, sinktype, strict=False):
-            return "warning"
-        return "exception"
-    if linkMerge == "merge_nested":
-        return check_types({"items": _get_type(srctype), "type": "array"},
-                           _get_type(sinktype), None, None)
-    if linkMerge == "merge_flattened":
-        return check_types(merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None)
-    raise WorkflowException(u"Unrecognized linkMerge enum '{}'".format(linkMerge))
-
-
-def merge_flatten_type(src):
-    # type: (Any) -> Any
-    """Return the merge flattened type of the source type."""
-    if isinstance(src, MutableSequence):
-        return [merge_flatten_type(t) for t in src]
-    if isinstance(src, MutableMapping) and src.get("type") == "array":
-        return src
-    return {"items": src, "type": "array"}
-
-
-def can_assign_src_to_sink(src, sink, strict=False):  # type: (Any, Any, bool) -> bool
-    """
-    Check for identical type specifications, ignoring extra keys like inputBinding.
-
-    src: admissible source types
-    sink: admissible sink types
-
-    In non-strict comparison, at least one source type must match one sink type.
-    In strict comparison, all source types must match at least one sink type.
-    """
-    if src == "Any" or sink == "Any":
-        return True
-    if isinstance(src, MutableMapping) and isinstance(sink, MutableMapping):
-        if sink.get("not_connected") and strict:
-            return False
-        if src["type"] == "array" and sink["type"] == "array":
-            return can_assign_src_to_sink(src["items"], sink["items"], strict)
-        if src["type"] == "record" and sink["type"] == "record":
-            return _compare_records(src, sink, strict)
-        if src["type"] == "File" and sink["type"] == "File":
-            for sinksf in sink.get("secondaryFiles", []):
-                if not [1 for srcsf in src.get("secondaryFiles", []) if sinksf == srcsf]:
-                    if strict:
-                        return False
-            return True
-        return can_assign_src_to_sink(src["type"], sink["type"], strict)
-    if isinstance(src, MutableSequence):
-        if strict:
-            for this_src in src:
-                if not can_assign_src_to_sink(this_src, sink):
-                    return False
-            return True
-        for this_src in src:
-            if can_assign_src_to_sink(this_src, sink):
-                return True
-        return False
-    if isinstance(sink, MutableSequence):
-        for this_sink in sink:
-            if can_assign_src_to_sink(src, this_sink):
-                return True
-        return False
-    return bool(src == sink)
-
-
-def _compare_records(src, sink, strict=False):
-    # type: (MutableMapping[Text, Any], MutableMapping[Text, Any], bool) -> bool
-    """
-    Compare two records, ensuring they have compatible fields.
-
-    This handles normalizing record names, which will be relative to workflow
-    step, so that they can be compared.
-    """
-    def _rec_fields(rec):  # type: (MutableMapping[Text, Any]) -> MutableMapping[Text, Any]
-        out = {}
-        for field in rec["fields"]:
-            name = shortname(field["name"])
-            out[name] = field["type"]
-        return out
-
-    srcfields = _rec_fields(src)
-    sinkfields = _rec_fields(sink)
-    for key in six.iterkeys(sinkfields):
-        if (not can_assign_src_to_sink(
-                srcfields.get(key, "null"), sinkfields.get(key, "null"), strict)
-                and sinkfields.get(key) is not None):
-            _logger.info("Record comparison failure for %s and %s\n"
-                         "Did not match fields for %s: %s and %s",
-                         src["name"], sink["name"], key, srcfields.get(key),
-                         sinkfields.get(key))
-            return False
-    return True
-
-def missing_subset(fullset, subset): # type: (List[Any], List[Any]) -> List[Any]
-    missing = []
-    for i in subset:
-        if i not in fullset:
-            missing.append(i)
-    return missing
-
-def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step):
-    # type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], Dict[Text, Dict[Text, Any]]) -> None
-    """Check if all source and sink types of a workflow are compatible before run time."""
-    # source parameters: workflow_inputs and step_outputs
-    # sink parameters: step_inputs and workflow_outputs
-
-    # make a dictionary of source parameters, indexed by the "id" field
-    src_parms = workflow_inputs + step_outputs
-    src_dict = {}
-    for parm in src_parms:
-        src_dict[parm["id"]] = parm
-
-    step_inputs_val = check_all_types(src_dict, step_inputs, "source")
-    workflow_outputs_val = check_all_types(src_dict, workflow_outputs, "outputSource")
-
-    warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"]
-    exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"]
-
-    warning_msgs = []
-    exception_msgs = []
-    for warning in warnings:
-        src = warning.src
-        sink = warning.sink
-        linkMerge = warning.linkMerge
-        sinksf = sorted([p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)])
-        srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])])
-        # Every secondaryFile required by the sink, should be declared
-        # by the source
-        missing = missing_subset(srcsf, sinksf)
-        if missing:
-            msg1 = "Parameter '%s' requires secondaryFiles %s but" % (shortname(sink["id"]), missing)
-            msg3 = SourceLine(src, "id").makeError(
-                "source '%s' does not provide those secondaryFiles." % (shortname(src["id"])))
-            msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError("To resolve, add missing secondaryFiles patterns to definition of '%s' or" % (shortname(src["id"])))
-            msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError("mark missing secondaryFiles in definition of '%s' as optional." % shortname(sink["id"]))
-            msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg3, msg4, msg5], "  ")))
-        elif sink.get("not_connected"):
-            msg = SourceLine(sink, "type").makeError(
-                "'%s' is not an input parameter of %s, expected %s"
-                % (shortname(sink["id"]), param_to_step[sink["id"]]["run"],
-                   ", ".join(shortname(s["id"])
-                             for s in param_to_step[sink["id"]]["inputs"]
-                             if not s.get("not_connected"))))
-        else:
-            msg = SourceLine(src, "type").makeError(
-                "Source '%s' of type %s may be incompatible"
-                % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
-                SourceLine(sink, "type").makeError(
-                    "  with sink '%s' of type %s"
-                    % (shortname(sink["id"]), json_dumps(sink["type"])))
-            if linkMerge is not None:
-                msg += "\n" + SourceLine(sink).makeError("  source has linkMerge method %s" % linkMerge)
-
-        warning_msgs.append(msg)
-    for exception in exceptions:
-        src = exception.src
-        sink = exception.sink
-        linkMerge = exception.linkMerge
-        msg = SourceLine(src, "type").makeError(
-            "Source '%s' of type %s is incompatible"
-            % (shortname(src["id"]), json_dumps(src["type"]))) + "\n" + \
-            SourceLine(sink, "type").makeError(
-                "  with sink '%s' of type %s"
-                % (shortname(sink["id"]), json_dumps(sink["type"])))
-        if linkMerge is not None:
-            msg += "\n" + SourceLine(sink).makeError("  source has linkMerge method %s" % linkMerge)
-        exception_msgs.append(msg)
-
-    for sink in step_inputs:
-        if ('null' != sink["type"] and 'null' not in sink["type"]
-                and "source" not in sink and "default" not in sink and "valueFrom" not in sink):
-            msg = SourceLine(sink).makeError(
-                "Required parameter '%s' does not have source, default, or valueFrom expression"
-                % shortname(sink["id"]))
-            exception_msgs.append(msg)
-
-    all_warning_msg = strip_dup_lineno("\n".join(warning_msgs))
-    all_exception_msg = strip_dup_lineno("\n".join(exception_msgs))
-
-    if warnings:
-        _logger.warning("Workflow checker warning:\n%s", all_warning_msg)
-    if exceptions:
-        raise validate.ValidationException(all_exception_msg)
-
-
-SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge"])
-
-def check_all_types(src_dict, sinks, sourceField):
-    # type: (Dict[Text, Any], List[Dict[Text, Any]], Text) -> Dict[Text, List[SrcSink]]
-    """
-    Given a list of sinks, check if their types match with the types of their sources.
-
-    sourceField is either "soure" or "outputSource"
-    """
-    validation = {"warning": [], "exception": []}  # type: Dict[Text, List[SrcSink]]
-    for sink in sinks:
-        if sourceField in sink:
-            valueFrom = sink.get("valueFrom")
-            if isinstance(sink[sourceField], MutableSequence):
-                srcs_of_sink = [src_dict[parm_id] for parm_id in sink[sourceField]]
-                linkMerge = sink.get("linkMerge", ("merge_nested"
-                                                   if len(sink[sourceField]) > 1 else None))
-            else:
-                parm_id = sink[sourceField]
-                srcs_of_sink = [src_dict[parm_id]]
-                linkMerge = None
-            for src in srcs_of_sink:
-                check_result = check_types(src, sink, linkMerge, valueFrom)
-                if check_result == "warning":
-                    validation["warning"].append(SrcSink(src, sink, linkMerge))
-                elif check_result == "exception":
-                    validation["exception"].append(SrcSink(src, sink, linkMerge))
-    return validation