view env/lib/python3.7/site-packages/schema_salad/avro/schema.py @ 3:758bc20232e8 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:20:52 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications copyright (C) 2017-2018 Common Workflow Language.
"""
Contains the Schema classes.

A schema may be one of:
  A record, mapping field names to field value data;
  An enum, containing one of a small set of symbols;
  An array of values, all of the same schema;
  A union of other schemas;
  A unicode string;
  A 32-bit signed int;
  A 64-bit signed long;
  A 32-bit floating-point float;
  A 64-bit floating-point double;
  A boolean; or
  Null.
"""
from typing import Any, Dict, List, Optional, Text, Tuple, Union, cast

from schema_salad.exceptions import SchemaException

import six

#
# Constants
#

PRIMITIVE_TYPES = ("null", "boolean", "string", "int", "long", "float", "double")

NAMED_TYPES = ("enum", "record")

VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + ("array", "union")

SCHEMA_RESERVED_PROPS = (
    "type",
    "name",
    "namespace",
    "fields",  # Record
    "items",  # Array
    "symbols",  # Enum
    "doc",
)

FIELD_RESERVED_PROPS = ("default", "name", "doc", "order", "type")

VALID_FIELD_SORT_ORDERS = ("ascending", "descending", "ignore")

#
# Exceptions
#


class AvroException(SchemaException):
    pass


class SchemaParseException(AvroException):
    pass


#
# Base Classes
#


class Schema(object):
    """Base class for all Schema classes."""

    def __init__(self, atype, other_props=None):
        # type: (Text, Optional[Dict[Text, Any]]) -> None
        # Ensure valid ctor args
        if not isinstance(atype, six.string_types):
            raise SchemaParseException(
                "Schema type '{}' must be a string, was '{}.".format(atype, type(atype))
            )
        elif atype not in VALID_TYPES:
            fail_msg = "%s is not a valid type." % atype
            raise SchemaParseException(fail_msg)

        # add members
        if not hasattr(self, "_props"):
            self._props = {}  # type: Dict[Text, Any]
        self.set_prop("type", atype)
        self.type = atype
        self._props.update(other_props or {})

    # Read-only properties dict. Printing schemas
    # creates JSON properties directly from this dict.
    props = property(lambda self: self._props)

    # utility functions to manipulate properties dict
    def get_prop(self, key):  # type: (Text) -> Any
        return self._props.get(key)

    def set_prop(self, key, value):  # type: (Text, Any) -> None
        self._props[key] = value


class Name(object):
    """Class to describe Avro name."""

    def __init__(self, name_attr, space_attr, default_space):
        # type: (Text, Optional[Text], Optional[Text]) -> None
        """
        Formulate full name according to the specification.

        @arg name_attr: name value read in schema or None.
        @arg space_attr: namespace value read in schema or None.
        @ard default_space: the current default space or None.
        """
        # Ensure valid ctor args
        if not (isinstance(name_attr, six.string_types) or (name_attr is None)):
            fail_msg = "Name must be non-empty string or None."
            raise SchemaParseException(fail_msg)
        elif name_attr == "":
            fail_msg = "Name must be non-empty string or None."
            raise SchemaParseException(fail_msg)

        if not (isinstance(space_attr, six.string_types) or (space_attr is None)):
            fail_msg = "Space must be non-empty string or None."
            raise SchemaParseException(fail_msg)
        elif name_attr == "":
            fail_msg = "Space must be non-empty string or None."
            raise SchemaParseException(fail_msg)

        if not (isinstance(default_space, six.string_types) or (default_space is None)):
            fail_msg = "Default space must be non-empty string or None."
            raise SchemaParseException(fail_msg)
        elif name_attr == "":
            fail_msg = "Default must be non-empty string or None."
            raise SchemaParseException(fail_msg)

        self._full = None  # type: Optional[Text]

        if name_attr is None or name_attr == "":
            return

        if name_attr.find(".") < 0:
            if (space_attr is not None) and (space_attr != ""):
                self._full = "%s.%s" % (space_attr, name_attr)
            else:
                if (default_space is not None) and (default_space != ""):
                    self._full = "%s.%s" % (default_space, name_attr)
                else:
                    self._full = name_attr
        else:
            self._full = name_attr

    fullname = property(lambda self: self._full)

    def get_space(self):
        # type: () -> Optional[Text]
        """Back out a namespace from full name."""
        if self._full is None:
            return None

        if self._full.find(".") > 0:
            return self._full.rsplit(".", 1)[0]
        else:
            return ""


class Names(object):
    """Track name set and default namespace during parsing."""

    def __init__(self, default_namespace=None):
        # type: (Optional[Text]) -> None
        self.names = {}  # type: Dict[Text, NamedSchema]
        self.default_namespace = default_namespace

    def has_name(self, name_attr, space_attr):
        # type: (Text, Optional[Text]) -> bool
        test = Name(name_attr, space_attr, self.default_namespace).fullname
        return test in self.names

    def get_name(self, name_attr, space_attr):
        # type: (Text, Optional[Text]) -> Optional[NamedSchema]
        test = Name(name_attr, space_attr, self.default_namespace).fullname
        if test not in self.names:
            return None
        return self.names[test]

    def add_name(self, name_attr, space_attr, new_schema):
        # type: (Text, Optional[Text], NamedSchema) -> Name
        """
        Add a new schema object to the name set.

          @arg name_attr: name value read in schema
          @arg space_attr: namespace value read in schema.

          @return: the Name that was just added.
        """
        to_add = Name(name_attr, space_attr, self.default_namespace)

        if to_add.fullname in VALID_TYPES:
            fail_msg = "%s is a reserved type name." % to_add.fullname
            raise SchemaParseException(fail_msg)
        elif to_add.fullname in self.names:
            fail_msg = 'The name "%s" is already in use.' % to_add.fullname
            raise SchemaParseException(fail_msg)

        self.names[to_add.fullname] = new_schema
        return to_add


class NamedSchema(Schema):
    """Named Schemas specified in NAMED_TYPES."""

    def __init__(
        self,
        atype,  # type: Text
        name,  # type: Text
        namespace=None,  # type: Optional[Text]
        names=None,  # type: Optional[Names]
        other_props=None,  # type: Optional[Dict[Text, Text]]
    ):  # type: (...) -> None
        # Ensure valid ctor args
        if not name:
            fail_msg = "Named Schemas must have a non-empty name."
            raise SchemaParseException(fail_msg)
        elif not isinstance(name, six.string_types):
            fail_msg = "The name property must be a string."
            raise SchemaParseException(fail_msg)
        elif namespace is not None and not isinstance(namespace, six.string_types):
            fail_msg = "The namespace property must be a string."
            raise SchemaParseException(fail_msg)
        if names is None:
            raise SchemaParseException("Must provide Names.")

        # Call parent ctor
        Schema.__init__(self, atype, other_props)

        # Add class members
        new_name = names.add_name(name, namespace, self)

        # Store name and namespace as they were read in origin schema
        self.set_prop("name", name)
        if namespace is not None:
            self.set_prop("namespace", new_name.get_space())

        # Store full name as calculated from name, namespace
        self._fullname = new_name.fullname

    # read-only properties
    name = property(lambda self: self.get_prop("name"))


class Field(object):
    def __init__(
        self,
        atype,  # type: Union[Text, Dict[Text, Text]]
        name,  # type: Text
        has_default,  # type: bool
        default=None,  # type: Optional[Text]
        order=None,  # type: Optional[Text]
        names=None,  # type: Optional[Names]
        doc=None,  # type: Optional[Text]
        other_props=None,  # type: Optional[Dict[Text, Text]]
    ):  # type: (...) -> None
        # Ensure valid ctor args
        if not name:
            fail_msg = "Fields must have a non-empty name."
            raise SchemaParseException(fail_msg)
        elif not isinstance(name, six.string_types):
            fail_msg = "The name property must be a string."
            raise SchemaParseException(fail_msg)
        elif order is not None and order not in VALID_FIELD_SORT_ORDERS:
            fail_msg = "The order property %s is not valid." % order
            raise SchemaParseException(fail_msg)

        # add members
        self._props = {}  # type: Dict[Text, Union[Schema, Text, None]]
        self._has_default = has_default
        self._props.update(other_props or {})

        if (
            isinstance(atype, six.string_types)
            and names is not None
            and names.has_name(atype, None)
        ):
            type_schema = cast(NamedSchema, names.get_name(atype, None))  # type: Schema
        else:
            try:
                type_schema = make_avsc_object(cast(Dict[Text, Text], atype), names)
            except Exception as e:
                raise SchemaParseException(
                    'Type property "%s" not a valid Avro schema: %s' % (atype, e)
                )
        self.set_prop("type", type_schema)
        self.set_prop("name", name)
        self.type = type_schema
        self.name = name
        # TODO(hammer): check to ensure default is valid
        if has_default:
            self.set_prop("default", default)
        if order is not None:
            self.set_prop("order", order)
        if doc is not None:
            self.set_prop("doc", doc)

    # read-only properties
    default = property(lambda self: self.get_prop("default"))

    # utility functions to manipulate properties dict
    def get_prop(self, key):  # type: (Text) -> Union[Schema, Text, None]
        return self._props.get(key)

    def set_prop(self, key, value):
        # type: (Text, Union[Schema, Text, None]) -> None
        self._props[key] = value


#
# Primitive Types
#
class PrimitiveSchema(Schema):
    """Valid primitive types are in PRIMITIVE_TYPES."""

    def __init__(self, atype, other_props=None):
        # type: (Text, Optional[Dict[Text, Text]]) -> None
        # Ensure valid ctor args
        if atype not in PRIMITIVE_TYPES:
            raise AvroException("%s is not a valid primitive type." % atype)

        # Call parent ctor
        Schema.__init__(self, atype, other_props=other_props)

        self.fullname = atype


#
# Complex Types (non-recursive)
#


class EnumSchema(NamedSchema):
    def __init__(
        self,
        name,  # type: Text
        namespace,  # type: Text
        symbols,  # type: List[Text]
        names=None,  # type: Optional[Names]
        doc=None,  # type: Optional[Text]
        other_props=None,  # type: Optional[Dict[Text, Text]]
    ):  # type: (...) -> None
        # Ensure valid ctor args
        if not isinstance(symbols, list):
            fail_msg = "Enum Schema requires a JSON array for the symbols property."
            raise AvroException(fail_msg)
        elif False in [isinstance(s, six.string_types) for s in symbols]:
            fail_msg = "Enum Schema requires all symbols to be JSON strings."
            raise AvroException(fail_msg)
        elif len(set(symbols)) < len(symbols):
            fail_msg = "Duplicate symbol: %s" % symbols
            raise AvroException(fail_msg)

        # Call parent ctor
        NamedSchema.__init__(self, "enum", name, namespace, names, other_props)

        # Add class members
        self.set_prop("symbols", symbols)
        if doc is not None:
            self.set_prop("doc", doc)

    # read-only properties
    symbols = property(lambda self: self.get_prop("symbols"))


#
# Complex Types (recursive)
#


class ArraySchema(Schema):
    def __init__(self, items, names=None, other_props=None):
        # type: (List[Any], Optional[Names], Optional[Dict[Text, Text]]) -> None
        # Call parent ctor
        Schema.__init__(self, "array", other_props)
        # Add class members

        if names is None:
            raise SchemaParseException("Must provide Names.")
        if isinstance(items, six.string_types) and names.has_name(items, None):
            items_schema = cast(Schema, names.get_name(items, None))
        else:
            try:
                items_schema = make_avsc_object(items, names)
            except Exception as err:
                raise SchemaParseException(
                    "Items schema (%s) not a valid Avro schema: %s (known "
                    "names: %s)" % (items, err, list(names.names.keys()))
                )

        self.set_prop("items", items_schema)

    # read-only properties
    items = property(lambda self: self.get_prop("items"))


class UnionSchema(Schema):
    """
    names is a dictionary of schema objects
    """

    def __init__(self, schemas, names=None):
        # type: (List[Schema], Optional[Names]) -> None
        # Ensure valid ctor args
        if names is None:
            raise SchemaParseException("Must provide Names.")
        if not isinstance(schemas, list):
            fail_msg = "Union schema requires a list of schemas."
            raise SchemaParseException(fail_msg)

        # Call parent ctor
        Schema.__init__(self, "union")

        # Add class members
        schema_objects = []  # type: List[Schema]
        for schema in schemas:
            if isinstance(schema, six.string_types) and names.has_name(schema, None):
                new_schema = cast(Schema, names.get_name(schema, None))
            else:
                try:
                    new_schema = make_avsc_object(schema, names)  # type: ignore
                except Exception as err:
                    raise SchemaParseException(
                        "Union item must be a valid Avro schema: %s" % Text(err)
                    )
            # check the new schema
            if (
                new_schema.type in VALID_TYPES
                and new_schema.type not in NAMED_TYPES
                and new_schema.type in [schema.type for schema in schema_objects]
            ):
                raise SchemaParseException("%s type already in Union" % new_schema.type)
            elif new_schema.type == "union":
                raise SchemaParseException("Unions cannot contain other unions.")
            else:
                schema_objects.append(new_schema)
        self._schemas = schema_objects

    # read-only properties
    schemas = property(lambda self: self._schemas)


class RecordSchema(NamedSchema):
    @staticmethod
    def make_field_objects(field_data, names):
        # type: (List[Dict[Text, Text]], Names) -> List[Field]
        """We're going to need to make message parameters too."""
        field_objects = []
        field_names = []  # type: List[Text]
        for field in field_data:
            if hasattr(field, "get") and callable(field.get):
                atype = cast(Text, field.get("type"))
                name = cast(Text, field.get("name"))

                # null values can have a default value of None
                has_default = False
                default = None
                if "default" in field:
                    has_default = True
                    default = field.get("default")

                order = field.get("order")
                doc = field.get("doc")
                other_props = get_other_props(field, FIELD_RESERVED_PROPS)
                new_field = Field(
                    atype, name, has_default, default, order, names, doc, other_props
                )
                # make sure field name has not been used yet
                if new_field.name in field_names:
                    fail_msg = "Field name %s already in use." % new_field.name
                    raise SchemaParseException(fail_msg)
                field_names.append(new_field.name)
            else:
                raise SchemaParseException("Not a valid field: %s" % field)
            field_objects.append(new_field)
        return field_objects

    def __init__(
        self,
        name,  # type: Text
        namespace,  # type: Text
        fields,  # type: List[Dict[Text, Text]]
        names=None,  # type: Optional[Names]
        schema_type="record",  # type: Text
        doc=None,  # type: Optional[Text]
        other_props=None,  # type: Optional[Dict[Text, Text]]
    ):  # type: (...) -> None
        # Ensure valid ctor args
        if fields is None:
            fail_msg = "Record schema requires a non-empty fields property."
            raise SchemaParseException(fail_msg)
        elif not isinstance(fields, list):
            fail_msg = "Fields property must be a list of Avro schemas."
            raise SchemaParseException(fail_msg)
        if names is None:
            raise SchemaParseException("Must provide Names.")

        # Call parent ctor (adds own name to namespace, too)
        NamedSchema.__init__(self, schema_type, name, namespace, names, other_props)

        if schema_type == "record":
            old_default = names.default_namespace
            names.default_namespace = Name(
                name, namespace, names.default_namespace
            ).get_space()

        # Add class members
        field_objects = RecordSchema.make_field_objects(fields, names)
        self.set_prop("fields", field_objects)
        if doc is not None:
            self.set_prop("doc", doc)

        if schema_type == "record":
            names.default_namespace = old_default

    # read-only properties
    fields = property(lambda self: self.get_prop("fields"))


#
# Module Methods
#
def get_other_props(all_props, reserved_props):
    # type: (Dict[Text, Text], Tuple[Text, ...]) -> Optional[Dict[Text, Text]]
    """
    Retrieve the non-reserved properties from a dictionary of properties
    @args reserved_props: The set of reserved properties to exclude
    """
    if hasattr(all_props, "items") and callable(all_props.items):
        return dict(
            [(k, v) for (k, v) in list(all_props.items()) if k not in reserved_props]
        )
    return None


def make_avsc_object(json_data, names=None):
    # type: (Union[Dict[Text, Text], List[Any], Text], Optional[Names]) -> Schema
    """
    Build Avro Schema from data parsed out of JSON string.

    @arg names: A Name object (tracks seen names and default space)
    """
    if names is None:
        names = Names()
    assert isinstance(names, Names)

    # JSON object (non-union)
    if hasattr(json_data, "get") and callable(json_data.get):  # type: ignore
        assert isinstance(json_data, Dict)
        atype = cast(Text, json_data.get("type"))
        other_props = get_other_props(json_data, SCHEMA_RESERVED_PROPS)
        if atype in PRIMITIVE_TYPES:
            return PrimitiveSchema(atype, other_props)
        if atype in NAMED_TYPES:
            name = cast(Text, json_data.get("name"))
            namespace = cast(Text, json_data.get("namespace", names.default_namespace))
            if atype == "enum":
                symbols = cast(List[Text], json_data.get("symbols"))
                doc = json_data.get("doc")
                return EnumSchema(name, namespace, symbols, names, doc, other_props)
            if atype in ["record", "error"]:
                fields = cast(List[Dict[Text, Text]], json_data.get("fields"))
                doc = json_data.get("doc")
                return RecordSchema(
                    name, namespace, fields, names, atype, doc, other_props
                )
            raise SchemaParseException("Unknown Named Type: %s" % atype)
        if atype in VALID_TYPES:
            if atype == "array":
                items = cast(List[Text], json_data.get("items"))
                return ArraySchema(items, names, other_props)
        if atype is None:
            raise SchemaParseException('No "type" property: %s' % json_data)
        raise SchemaParseException("Undefined type: %s" % atype)
    # JSON array (union)
    if isinstance(json_data, list):
        return UnionSchema(json_data, names)
    # JSON string (primitive)
    if json_data in PRIMITIVE_TYPES:
        return PrimitiveSchema(cast(Text, json_data))
    # not for us!
    fail_msg = "Could not make an Avro Schema object from %s." % json_data
    raise SchemaParseException(fail_msg)