Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/schema_salad/avro/schema.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/schema_salad/avro/schema.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,601 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# Modifications copyright (C) 2017-2018 Common Workflow Language. +""" +Contains the Schema classes. + +A schema may be one of: + A record, mapping field names to field value data; + An enum, containing one of a small set of symbols; + An array of values, all of the same schema; + A union of other schemas; + A unicode string; + A 32-bit signed int; + A 64-bit signed long; + A 32-bit floating-point float; + A 64-bit floating-point double; + A boolean; or + Null. +""" +from typing import Any, Dict, List, Optional, Text, Tuple, Union, cast + +from schema_salad.exceptions import SchemaException + +import six + +# +# Constants +# + +PRIMITIVE_TYPES = ("null", "boolean", "string", "int", "long", "float", "double") + +NAMED_TYPES = ("enum", "record") + +VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + ("array", "union") + +SCHEMA_RESERVED_PROPS = ( + "type", + "name", + "namespace", + "fields", # Record + "items", # Array + "symbols", # Enum + "doc", +) + +FIELD_RESERVED_PROPS = ("default", "name", "doc", "order", "type") + +VALID_FIELD_SORT_ORDERS = ("ascending", "descending", "ignore") + +# +# Exceptions +# + + +class AvroException(SchemaException): + pass + + +class SchemaParseException(AvroException): + pass + + +# +# Base Classes +# + + +class Schema(object): + """Base class for all Schema classes.""" + + def __init__(self, atype, other_props=None): + # type: (Text, Optional[Dict[Text, Any]]) -> None + # Ensure valid ctor args + if not isinstance(atype, six.string_types): + raise SchemaParseException( + "Schema type '{}' must be a string, was '{}.".format(atype, type(atype)) + ) + elif atype not in VALID_TYPES: + fail_msg = "%s is not a valid type." % atype + raise SchemaParseException(fail_msg) + + # add members + if not hasattr(self, "_props"): + self._props = {} # type: Dict[Text, Any] + self.set_prop("type", atype) + self.type = atype + self._props.update(other_props or {}) + + # Read-only properties dict. Printing schemas + # creates JSON properties directly from this dict. + props = property(lambda self: self._props) + + # utility functions to manipulate properties dict + def get_prop(self, key): # type: (Text) -> Any + return self._props.get(key) + + def set_prop(self, key, value): # type: (Text, Any) -> None + self._props[key] = value + + +class Name(object): + """Class to describe Avro name.""" + + def __init__(self, name_attr, space_attr, default_space): + # type: (Text, Optional[Text], Optional[Text]) -> None + """ + Formulate full name according to the specification. + + @arg name_attr: name value read in schema or None. + @arg space_attr: namespace value read in schema or None. + @ard default_space: the current default space or None. + """ + # Ensure valid ctor args + if not (isinstance(name_attr, six.string_types) or (name_attr is None)): + fail_msg = "Name must be non-empty string or None." + raise SchemaParseException(fail_msg) + elif name_attr == "": + fail_msg = "Name must be non-empty string or None." + raise SchemaParseException(fail_msg) + + if not (isinstance(space_attr, six.string_types) or (space_attr is None)): + fail_msg = "Space must be non-empty string or None." + raise SchemaParseException(fail_msg) + elif name_attr == "": + fail_msg = "Space must be non-empty string or None." + raise SchemaParseException(fail_msg) + + if not (isinstance(default_space, six.string_types) or (default_space is None)): + fail_msg = "Default space must be non-empty string or None." + raise SchemaParseException(fail_msg) + elif name_attr == "": + fail_msg = "Default must be non-empty string or None." + raise SchemaParseException(fail_msg) + + self._full = None # type: Optional[Text] + + if name_attr is None or name_attr == "": + return + + if name_attr.find(".") < 0: + if (space_attr is not None) and (space_attr != ""): + self._full = "%s.%s" % (space_attr, name_attr) + else: + if (default_space is not None) and (default_space != ""): + self._full = "%s.%s" % (default_space, name_attr) + else: + self._full = name_attr + else: + self._full = name_attr + + fullname = property(lambda self: self._full) + + def get_space(self): + # type: () -> Optional[Text] + """Back out a namespace from full name.""" + if self._full is None: + return None + + if self._full.find(".") > 0: + return self._full.rsplit(".", 1)[0] + else: + return "" + + +class Names(object): + """Track name set and default namespace during parsing.""" + + def __init__(self, default_namespace=None): + # type: (Optional[Text]) -> None + self.names = {} # type: Dict[Text, NamedSchema] + self.default_namespace = default_namespace + + def has_name(self, name_attr, space_attr): + # type: (Text, Optional[Text]) -> bool + test = Name(name_attr, space_attr, self.default_namespace).fullname + return test in self.names + + def get_name(self, name_attr, space_attr): + # type: (Text, Optional[Text]) -> Optional[NamedSchema] + test = Name(name_attr, space_attr, self.default_namespace).fullname + if test not in self.names: + return None + return self.names[test] + + def add_name(self, name_attr, space_attr, new_schema): + # type: (Text, Optional[Text], NamedSchema) -> Name + """ + Add a new schema object to the name set. + + @arg name_attr: name value read in schema + @arg space_attr: namespace value read in schema. + + @return: the Name that was just added. + """ + to_add = Name(name_attr, space_attr, self.default_namespace) + + if to_add.fullname in VALID_TYPES: + fail_msg = "%s is a reserved type name." % to_add.fullname + raise SchemaParseException(fail_msg) + elif to_add.fullname in self.names: + fail_msg = 'The name "%s" is already in use.' % to_add.fullname + raise SchemaParseException(fail_msg) + + self.names[to_add.fullname] = new_schema + return to_add + + +class NamedSchema(Schema): + """Named Schemas specified in NAMED_TYPES.""" + + def __init__( + self, + atype, # type: Text + name, # type: Text + namespace=None, # type: Optional[Text] + names=None, # type: Optional[Names] + other_props=None, # type: Optional[Dict[Text, Text]] + ): # type: (...) -> None + # Ensure valid ctor args + if not name: + fail_msg = "Named Schemas must have a non-empty name." + raise SchemaParseException(fail_msg) + elif not isinstance(name, six.string_types): + fail_msg = "The name property must be a string." + raise SchemaParseException(fail_msg) + elif namespace is not None and not isinstance(namespace, six.string_types): + fail_msg = "The namespace property must be a string." + raise SchemaParseException(fail_msg) + if names is None: + raise SchemaParseException("Must provide Names.") + + # Call parent ctor + Schema.__init__(self, atype, other_props) + + # Add class members + new_name = names.add_name(name, namespace, self) + + # Store name and namespace as they were read in origin schema + self.set_prop("name", name) + if namespace is not None: + self.set_prop("namespace", new_name.get_space()) + + # Store full name as calculated from name, namespace + self._fullname = new_name.fullname + + # read-only properties + name = property(lambda self: self.get_prop("name")) + + +class Field(object): + def __init__( + self, + atype, # type: Union[Text, Dict[Text, Text]] + name, # type: Text + has_default, # type: bool + default=None, # type: Optional[Text] + order=None, # type: Optional[Text] + names=None, # type: Optional[Names] + doc=None, # type: Optional[Text] + other_props=None, # type: Optional[Dict[Text, Text]] + ): # type: (...) -> None + # Ensure valid ctor args + if not name: + fail_msg = "Fields must have a non-empty name." + raise SchemaParseException(fail_msg) + elif not isinstance(name, six.string_types): + fail_msg = "The name property must be a string." + raise SchemaParseException(fail_msg) + elif order is not None and order not in VALID_FIELD_SORT_ORDERS: + fail_msg = "The order property %s is not valid." % order + raise SchemaParseException(fail_msg) + + # add members + self._props = {} # type: Dict[Text, Union[Schema, Text, None]] + self._has_default = has_default + self._props.update(other_props or {}) + + if ( + isinstance(atype, six.string_types) + and names is not None + and names.has_name(atype, None) + ): + type_schema = cast(NamedSchema, names.get_name(atype, None)) # type: Schema + else: + try: + type_schema = make_avsc_object(cast(Dict[Text, Text], atype), names) + except Exception as e: + raise SchemaParseException( + 'Type property "%s" not a valid Avro schema: %s' % (atype, e) + ) + self.set_prop("type", type_schema) + self.set_prop("name", name) + self.type = type_schema + self.name = name + # TODO(hammer): check to ensure default is valid + if has_default: + self.set_prop("default", default) + if order is not None: + self.set_prop("order", order) + if doc is not None: + self.set_prop("doc", doc) + + # read-only properties + default = property(lambda self: self.get_prop("default")) + + # utility functions to manipulate properties dict + def get_prop(self, key): # type: (Text) -> Union[Schema, Text, None] + return self._props.get(key) + + def set_prop(self, key, value): + # type: (Text, Union[Schema, Text, None]) -> None + self._props[key] = value + + +# +# Primitive Types +# +class PrimitiveSchema(Schema): + """Valid primitive types are in PRIMITIVE_TYPES.""" + + def __init__(self, atype, other_props=None): + # type: (Text, Optional[Dict[Text, Text]]) -> None + # Ensure valid ctor args + if atype not in PRIMITIVE_TYPES: + raise AvroException("%s is not a valid primitive type." % atype) + + # Call parent ctor + Schema.__init__(self, atype, other_props=other_props) + + self.fullname = atype + + +# +# Complex Types (non-recursive) +# + + +class EnumSchema(NamedSchema): + def __init__( + self, + name, # type: Text + namespace, # type: Text + symbols, # type: List[Text] + names=None, # type: Optional[Names] + doc=None, # type: Optional[Text] + other_props=None, # type: Optional[Dict[Text, Text]] + ): # type: (...) -> None + # Ensure valid ctor args + if not isinstance(symbols, list): + fail_msg = "Enum Schema requires a JSON array for the symbols property." + raise AvroException(fail_msg) + elif False in [isinstance(s, six.string_types) for s in symbols]: + fail_msg = "Enum Schema requires all symbols to be JSON strings." + raise AvroException(fail_msg) + elif len(set(symbols)) < len(symbols): + fail_msg = "Duplicate symbol: %s" % symbols + raise AvroException(fail_msg) + + # Call parent ctor + NamedSchema.__init__(self, "enum", name, namespace, names, other_props) + + # Add class members + self.set_prop("symbols", symbols) + if doc is not None: + self.set_prop("doc", doc) + + # read-only properties + symbols = property(lambda self: self.get_prop("symbols")) + + +# +# Complex Types (recursive) +# + + +class ArraySchema(Schema): + def __init__(self, items, names=None, other_props=None): + # type: (List[Any], Optional[Names], Optional[Dict[Text, Text]]) -> None + # Call parent ctor + Schema.__init__(self, "array", other_props) + # Add class members + + if names is None: + raise SchemaParseException("Must provide Names.") + if isinstance(items, six.string_types) and names.has_name(items, None): + items_schema = cast(Schema, names.get_name(items, None)) + else: + try: + items_schema = make_avsc_object(items, names) + except Exception as err: + raise SchemaParseException( + "Items schema (%s) not a valid Avro schema: %s (known " + "names: %s)" % (items, err, list(names.names.keys())) + ) + + self.set_prop("items", items_schema) + + # read-only properties + items = property(lambda self: self.get_prop("items")) + + +class UnionSchema(Schema): + """ + names is a dictionary of schema objects + """ + + def __init__(self, schemas, names=None): + # type: (List[Schema], Optional[Names]) -> None + # Ensure valid ctor args + if names is None: + raise SchemaParseException("Must provide Names.") + if not isinstance(schemas, list): + fail_msg = "Union schema requires a list of schemas." + raise SchemaParseException(fail_msg) + + # Call parent ctor + Schema.__init__(self, "union") + + # Add class members + schema_objects = [] # type: List[Schema] + for schema in schemas: + if isinstance(schema, six.string_types) and names.has_name(schema, None): + new_schema = cast(Schema, names.get_name(schema, None)) + else: + try: + new_schema = make_avsc_object(schema, names) # type: ignore + except Exception as err: + raise SchemaParseException( + "Union item must be a valid Avro schema: %s" % Text(err) + ) + # check the new schema + if ( + new_schema.type in VALID_TYPES + and new_schema.type not in NAMED_TYPES + and new_schema.type in [schema.type for schema in schema_objects] + ): + raise SchemaParseException("%s type already in Union" % new_schema.type) + elif new_schema.type == "union": + raise SchemaParseException("Unions cannot contain other unions.") + else: + schema_objects.append(new_schema) + self._schemas = schema_objects + + # read-only properties + schemas = property(lambda self: self._schemas) + + +class RecordSchema(NamedSchema): + @staticmethod + def make_field_objects(field_data, names): + # type: (List[Dict[Text, Text]], Names) -> List[Field] + """We're going to need to make message parameters too.""" + field_objects = [] + field_names = [] # type: List[Text] + for field in field_data: + if hasattr(field, "get") and callable(field.get): + atype = cast(Text, field.get("type")) + name = cast(Text, field.get("name")) + + # null values can have a default value of None + has_default = False + default = None + if "default" in field: + has_default = True + default = field.get("default") + + order = field.get("order") + doc = field.get("doc") + other_props = get_other_props(field, FIELD_RESERVED_PROPS) + new_field = Field( + atype, name, has_default, default, order, names, doc, other_props + ) + # make sure field name has not been used yet + if new_field.name in field_names: + fail_msg = "Field name %s already in use." % new_field.name + raise SchemaParseException(fail_msg) + field_names.append(new_field.name) + else: + raise SchemaParseException("Not a valid field: %s" % field) + field_objects.append(new_field) + return field_objects + + def __init__( + self, + name, # type: Text + namespace, # type: Text + fields, # type: List[Dict[Text, Text]] + names=None, # type: Optional[Names] + schema_type="record", # type: Text + doc=None, # type: Optional[Text] + other_props=None, # type: Optional[Dict[Text, Text]] + ): # type: (...) -> None + # Ensure valid ctor args + if fields is None: + fail_msg = "Record schema requires a non-empty fields property." + raise SchemaParseException(fail_msg) + elif not isinstance(fields, list): + fail_msg = "Fields property must be a list of Avro schemas." + raise SchemaParseException(fail_msg) + if names is None: + raise SchemaParseException("Must provide Names.") + + # Call parent ctor (adds own name to namespace, too) + NamedSchema.__init__(self, schema_type, name, namespace, names, other_props) + + if schema_type == "record": + old_default = names.default_namespace + names.default_namespace = Name( + name, namespace, names.default_namespace + ).get_space() + + # Add class members + field_objects = RecordSchema.make_field_objects(fields, names) + self.set_prop("fields", field_objects) + if doc is not None: + self.set_prop("doc", doc) + + if schema_type == "record": + names.default_namespace = old_default + + # read-only properties + fields = property(lambda self: self.get_prop("fields")) + + +# +# Module Methods +# +def get_other_props(all_props, reserved_props): + # type: (Dict[Text, Text], Tuple[Text, ...]) -> Optional[Dict[Text, Text]] + """ + Retrieve the non-reserved properties from a dictionary of properties + @args reserved_props: The set of reserved properties to exclude + """ + if hasattr(all_props, "items") and callable(all_props.items): + return dict( + [(k, v) for (k, v) in list(all_props.items()) if k not in reserved_props] + ) + return None + + +def make_avsc_object(json_data, names=None): + # type: (Union[Dict[Text, Text], List[Any], Text], Optional[Names]) -> Schema + """ + Build Avro Schema from data parsed out of JSON string. + + @arg names: A Name object (tracks seen names and default space) + """ + if names is None: + names = Names() + assert isinstance(names, Names) + + # JSON object (non-union) + if hasattr(json_data, "get") and callable(json_data.get): # type: ignore + assert isinstance(json_data, Dict) + atype = cast(Text, json_data.get("type")) + other_props = get_other_props(json_data, SCHEMA_RESERVED_PROPS) + if atype in PRIMITIVE_TYPES: + return PrimitiveSchema(atype, other_props) + if atype in NAMED_TYPES: + name = cast(Text, json_data.get("name")) + namespace = cast(Text, json_data.get("namespace", names.default_namespace)) + if atype == "enum": + symbols = cast(List[Text], json_data.get("symbols")) + doc = json_data.get("doc") + return EnumSchema(name, namespace, symbols, names, doc, other_props) + if atype in ["record", "error"]: + fields = cast(List[Dict[Text, Text]], json_data.get("fields")) + doc = json_data.get("doc") + return RecordSchema( + name, namespace, fields, names, atype, doc, other_props + ) + raise SchemaParseException("Unknown Named Type: %s" % atype) + if atype in VALID_TYPES: + if atype == "array": + items = cast(List[Text], json_data.get("items")) + return ArraySchema(items, names, other_props) + if atype is None: + raise SchemaParseException('No "type" property: %s' % json_data) + raise SchemaParseException("Undefined type: %s" % atype) + # JSON array (union) + if isinstance(json_data, list): + return UnionSchema(json_data, names) + # JSON string (primitive) + if json_data in PRIMITIVE_TYPES: + return PrimitiveSchema(cast(Text, json_data)) + # not for us! + fail_msg = "Could not make an Avro Schema object from %s." % json_data + raise SchemaParseException(fail_msg)