view env/lib/python3.7/site-packages/schema_salad/validate.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

from __future__ import absolute_import

import logging
import pprint
from typing import (  # noqa: F401
    Any,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Set,
    Union,
)

import six
from six.moves import urllib
from typing_extensions import Text  # pylint: disable=unused-import

from . import avro
from .exceptions import (
    ClassValidationException,
    ValidationException,
    SchemaSaladException,
)
from .avro import schema  # noqa: F401
from .avro.schema import (  # pylint: disable=unused-import, no-name-in-module, import-error
    Schema,
)
from .sourceline import SourceLine

# move to a regular typing import when Python 3.3-3.6 is no longer supported


_logger = logging.getLogger("salad")


def validate(
    expected_schema,  # type: Schema
    datum,  # type: Any
    identifiers=None,  # type: Optional[List[Text]]
    strict=False,  # type: bool
    foreign_properties=None,  # type: Optional[Set[Text]]
):
    # type: (...) -> bool
    if not identifiers:
        identifiers = []
    if not foreign_properties:
        foreign_properties = set()
    return validate_ex(
        expected_schema,
        datum,
        identifiers,
        strict=strict,
        foreign_properties=foreign_properties,
        raise_ex=False,
    )


INT_MIN_VALUE = -(1 << 31)
INT_MAX_VALUE = (1 << 31) - 1
LONG_MIN_VALUE = -(1 << 63)
LONG_MAX_VALUE = (1 << 63) - 1


def friendly(v):  # type: (Any) -> Any
    if isinstance(v, avro.schema.NamedSchema):
        return v.name
    if isinstance(v, avro.schema.ArraySchema):
        return "array of <{}>".format(friendly(v.items))
    elif isinstance(v, avro.schema.PrimitiveSchema):
        return v.type
    elif isinstance(v, avro.schema.UnionSchema):
        return " or ".join([friendly(s) for s in v.schemas])
    else:
        return v


def vpformat(datum):  # type: (Any) -> str
    a = pprint.pformat(datum)
    if len(a) > 160:
        a = a[0:160] + "[...]"
    return a


def validate_ex(
    expected_schema,  # type: Schema
    datum,  # type: Any
    identifiers=None,  # type: Optional[List[Text]]
    strict=False,  # type: bool
    foreign_properties=None,  # type: Optional[Set[Text]]
    raise_ex=True,  # type: bool
    strict_foreign_properties=False,  # type: bool
    logger=_logger,  # type: logging.Logger
    skip_foreign_properties=False,  # type: bool
):
    # type: (...) -> bool
    """Determine if a python datum is an instance of a schema."""

    if not identifiers:
        identifiers = []

    if not foreign_properties:
        foreign_properties = set()

    schema_type = expected_schema.type

    if schema_type == "null":
        if datum is None:
            return True
        else:
            if raise_ex:
                raise ValidationException(u"the value is not null")
            else:
                return False
    elif schema_type == "boolean":
        if isinstance(datum, bool):
            return True
        else:
            if raise_ex:
                raise ValidationException(u"the value is not boolean")
            else:
                return False
    elif schema_type == "string":
        if isinstance(datum, six.string_types):
            return True
        elif isinstance(datum, bytes):
            datum = datum.decode(u"utf-8")
            return True
        else:
            if raise_ex:
                raise ValidationException(u"the value is not string")
            else:
                return False
    elif schema_type == "int":
        if (
            isinstance(datum, six.integer_types)
            and INT_MIN_VALUE <= datum <= INT_MAX_VALUE
        ):
            return True
        else:
            if raise_ex:
                raise ValidationException(u"`{}` is not int".format(vpformat(datum)))
            else:
                return False
    elif schema_type == "long":
        if (
            isinstance(datum, six.integer_types)
        ) and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE:
            return True
        else:
            if raise_ex:
                raise ValidationException(
                    u"the value `{}` is not long".format(vpformat(datum))
                )
            else:
                return False
    elif schema_type in ["float", "double"]:
        if isinstance(datum, six.integer_types) or isinstance(datum, float):
            return True
        else:
            if raise_ex:
                raise ValidationException(
                    u"the value `{}` is not float or double".format(vpformat(datum))
                )
            else:
                return False
    elif isinstance(expected_schema, avro.schema.EnumSchema):
        if expected_schema.name == "Any":
            if datum is not None:
                return True
            else:
                if raise_ex:
                    raise ValidationException(u"'Any' type must be non-null")
                else:
                    return False
        if not isinstance(datum, six.string_types):
            if raise_ex:
                raise ValidationException(
                    u"value is a {} but expected a string".format(
                        (type(datum).__name__)
                    )
                )
            else:
                return False
        if expected_schema.name == "Expression":
            if "$(" in datum or "${" in datum:
                return True
            if raise_ex:
                raise ValidationException(
                    u"value `%s` does not contain an expression in the form $() or ${}"
                    % datum
                )
            else:
                return False
        if datum in expected_schema.symbols:
            return True
        else:
            if raise_ex:
                raise ValidationException(
                    u"the value {} is not a valid {}, expected {}{}".format(
                        vpformat(datum),
                        expected_schema.name,
                        "one of " if len(expected_schema.symbols) > 1 else "",
                        "'" + "', '".join(expected_schema.symbols) + "'",
                    )
                )
            else:
                return False
    elif isinstance(expected_schema, avro.schema.ArraySchema):
        if isinstance(datum, MutableSequence):
            for i, d in enumerate(datum):
                try:
                    sl = SourceLine(datum, i, ValidationException)
                    if not validate_ex(
                        expected_schema.items,
                        d,
                        identifiers,
                        strict=strict,
                        foreign_properties=foreign_properties,
                        raise_ex=raise_ex,
                        strict_foreign_properties=strict_foreign_properties,
                        logger=logger,
                        skip_foreign_properties=skip_foreign_properties,
                    ):
                        return False
                except ValidationException as v:
                    if raise_ex:
                        raise ValidationException("item is invalid because", sl, [v])
                    else:
                        return False
            return True
        else:
            if raise_ex:
                raise ValidationException(
                    u"the value {} is not a list, expected list of {}".format(
                        vpformat(datum), friendly(expected_schema.items)
                    )
                )
            else:
                return False
    elif isinstance(expected_schema, avro.schema.UnionSchema):
        for s in expected_schema.schemas:
            if validate_ex(
                s,
                datum,
                identifiers,
                strict=strict,
                raise_ex=False,
                strict_foreign_properties=strict_foreign_properties,
                logger=logger,
                skip_foreign_properties=skip_foreign_properties,
            ):
                return True

        if not raise_ex:
            return False

        errors = []  # type: List[SchemaSaladException]
        checked = []
        for s in expected_schema.schemas:
            if isinstance(datum, MutableSequence) and not isinstance(
                s, avro.schema.ArraySchema
            ):
                continue
            elif isinstance(datum, MutableMapping) and not isinstance(
                s, avro.schema.RecordSchema
            ):
                continue
            elif isinstance(
                datum, (bool, six.integer_types, float, six.string_types)
            ) and isinstance(s, (avro.schema.ArraySchema, avro.schema.RecordSchema)):
                continue
            elif datum is not None and s.type == "null":
                continue

            checked.append(s)
            try:
                validate_ex(
                    s,
                    datum,
                    identifiers,
                    strict=strict,
                    foreign_properties=foreign_properties,
                    raise_ex=True,
                    strict_foreign_properties=strict_foreign_properties,
                    logger=logger,
                    skip_foreign_properties=skip_foreign_properties,
                )
            except ClassValidationException:
                raise
            except ValidationException as e:
                errors.append(e)
        if bool(errors):
            raise ValidationException(
                "",
                None,
                [
                    ValidationException(
                        "tried {} but".format(friendly(check)), None, [err]
                    )
                    for (check, err) in zip(checked, errors)
                ],
                "-",
            )
        else:
            raise ValidationException(
                "value is a {}, expected {}".format(
                    type(datum).__name__, friendly(expected_schema)
                )
            )

    elif isinstance(expected_schema, avro.schema.RecordSchema):
        if not isinstance(datum, MutableMapping):
            if raise_ex:
                raise ValidationException(u"is not a dict")
            else:
                return False

        classmatch = None
        for f in expected_schema.fields:
            if f.name in ("class",):
                d = datum.get(f.name)
                if not d:
                    if raise_ex:
                        raise ValidationException(u"Missing '{}' field".format(f.name))
                    else:
                        return False
                if expected_schema.name != d:
                    if raise_ex:
                        raise ValidationException(
                            u"Expected class '{}' but this is '{}'".format(
                                expected_schema.name, d
                            )
                        )
                    else:
                        return False
                classmatch = d
                break

        errors = []
        for f in expected_schema.fields:
            if f.name in ("class",):
                continue

            if f.name in datum:
                fieldval = datum[f.name]
            else:
                try:
                    fieldval = f.default
                except KeyError:
                    fieldval = None

            try:
                sl = SourceLine(datum, f.name, six.text_type)
                if not validate_ex(
                    f.type,
                    fieldval,
                    identifiers,
                    strict=strict,
                    foreign_properties=foreign_properties,
                    raise_ex=raise_ex,
                    strict_foreign_properties=strict_foreign_properties,
                    logger=logger,
                    skip_foreign_properties=skip_foreign_properties,
                ):
                    return False
            except ValidationException as v:
                if f.name not in datum:
                    errors.append(
                        ValidationException(
                            u"missing required field `{}`".format(f.name)
                        )
                    )
                else:
                    errors.append(
                        ValidationException(
                            u"the `{}` field is not valid because".format(f.name),
                            sl,
                            [v],
                        )
                    )

        for d in datum:
            found = False
            for f in expected_schema.fields:
                if d == f.name:
                    found = True
            if not found:
                sl = SourceLine(datum, d, six.text_type)
                if d is None:
                    err = ValidationException(u"mapping with implicit null key", sl)
                    if strict:
                        errors.append(err)
                    else:
                        logger.warning(err)
                    continue
                if (
                    d not in identifiers
                    and d not in foreign_properties
                    and d[0] not in ("@", "$")
                ):
                    if (
                        (d not in identifiers and strict)
                        and (
                            d not in foreign_properties
                            and strict_foreign_properties
                            and not skip_foreign_properties
                        )
                        and not raise_ex
                    ):
                        return False
                    split = urllib.parse.urlsplit(d)
                    if split.scheme:
                        if not skip_foreign_properties:
                            err = ValidationException(
                                u"unrecognized extension field `{}`{}.{}".format(
                                    d,
                                    " and strict_foreign_properties checking is enabled"
                                    if strict_foreign_properties
                                    else "",
                                    "\nForeign properties from $schemas:\n  {}".format(
                                        "\n  ".join(sorted(foreign_properties))
                                    )
                                    if len(foreign_properties) > 0
                                    else "",
                                ),
                                sl,
                            )
                            if strict_foreign_properties:
                                errors.append(err)
                            elif len(foreign_properties) > 0:
                                logger.warning(err)
                    else:
                        err = ValidationException(
                            u"invalid field `{}`, expected one of: {}".format(
                                d,
                                ", ".join(
                                    "'{}'".format(fn.name)
                                    for fn in expected_schema.fields
                                ),
                            ),
                            sl,
                        )
                        if strict:
                            errors.append(err)
                        else:
                            logger.warning(err)

        if bool(errors):
            if raise_ex:
                if classmatch:
                    raise ClassValidationException("", None, errors, "*")
                else:
                    raise ValidationException("", None, errors, "*")
            else:
                return False
        else:
            return True
    if raise_ex:
        raise ValidationException(u"Unrecognized schema_type {}".format(schema_type))
    else:
        return False