comparison env/lib/python3.7/site-packages/schema_salad/avro/schema.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # Modifications copyright (C) 2017-2018 Common Workflow Language.
17 """
18 Contains the Schema classes.
19
20 A schema may be one of:
21 A record, mapping field names to field value data;
22 An enum, containing one of a small set of symbols;
23 An array of values, all of the same schema;
24 A union of other schemas;
25 A unicode string;
26 A 32-bit signed int;
27 A 64-bit signed long;
28 A 32-bit floating-point float;
29 A 64-bit floating-point double;
30 A boolean; or
31 Null.
32 """
33 from typing import Any, Dict, List, Optional, Text, Tuple, Union, cast
34
35 from schema_salad.exceptions import SchemaException
36
37 import six
38
39 #
40 # Constants
41 #
42
43 PRIMITIVE_TYPES = ("null", "boolean", "string", "int", "long", "float", "double")
44
45 NAMED_TYPES = ("enum", "record")
46
47 VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + ("array", "union")
48
49 SCHEMA_RESERVED_PROPS = (
50 "type",
51 "name",
52 "namespace",
53 "fields", # Record
54 "items", # Array
55 "symbols", # Enum
56 "doc",
57 )
58
59 FIELD_RESERVED_PROPS = ("default", "name", "doc", "order", "type")
60
61 VALID_FIELD_SORT_ORDERS = ("ascending", "descending", "ignore")
62
63 #
64 # Exceptions
65 #
66
67
68 class AvroException(SchemaException):
69 pass
70
71
72 class SchemaParseException(AvroException):
73 pass
74
75
76 #
77 # Base Classes
78 #
79
80
81 class Schema(object):
82 """Base class for all Schema classes."""
83
84 def __init__(self, atype, other_props=None):
85 # type: (Text, Optional[Dict[Text, Any]]) -> None
86 # Ensure valid ctor args
87 if not isinstance(atype, six.string_types):
88 raise SchemaParseException(
89 "Schema type '{}' must be a string, was '{}.".format(atype, type(atype))
90 )
91 elif atype not in VALID_TYPES:
92 fail_msg = "%s is not a valid type." % atype
93 raise SchemaParseException(fail_msg)
94
95 # add members
96 if not hasattr(self, "_props"):
97 self._props = {} # type: Dict[Text, Any]
98 self.set_prop("type", atype)
99 self.type = atype
100 self._props.update(other_props or {})
101
102 # Read-only properties dict. Printing schemas
103 # creates JSON properties directly from this dict.
104 props = property(lambda self: self._props)
105
106 # utility functions to manipulate properties dict
107 def get_prop(self, key): # type: (Text) -> Any
108 return self._props.get(key)
109
110 def set_prop(self, key, value): # type: (Text, Any) -> None
111 self._props[key] = value
112
113
114 class Name(object):
115 """Class to describe Avro name."""
116
117 def __init__(self, name_attr, space_attr, default_space):
118 # type: (Text, Optional[Text], Optional[Text]) -> None
119 """
120 Formulate full name according to the specification.
121
122 @arg name_attr: name value read in schema or None.
123 @arg space_attr: namespace value read in schema or None.
124 @ard default_space: the current default space or None.
125 """
126 # Ensure valid ctor args
127 if not (isinstance(name_attr, six.string_types) or (name_attr is None)):
128 fail_msg = "Name must be non-empty string or None."
129 raise SchemaParseException(fail_msg)
130 elif name_attr == "":
131 fail_msg = "Name must be non-empty string or None."
132 raise SchemaParseException(fail_msg)
133
134 if not (isinstance(space_attr, six.string_types) or (space_attr is None)):
135 fail_msg = "Space must be non-empty string or None."
136 raise SchemaParseException(fail_msg)
137 elif name_attr == "":
138 fail_msg = "Space must be non-empty string or None."
139 raise SchemaParseException(fail_msg)
140
141 if not (isinstance(default_space, six.string_types) or (default_space is None)):
142 fail_msg = "Default space must be non-empty string or None."
143 raise SchemaParseException(fail_msg)
144 elif name_attr == "":
145 fail_msg = "Default must be non-empty string or None."
146 raise SchemaParseException(fail_msg)
147
148 self._full = None # type: Optional[Text]
149
150 if name_attr is None or name_attr == "":
151 return
152
153 if name_attr.find(".") < 0:
154 if (space_attr is not None) and (space_attr != ""):
155 self._full = "%s.%s" % (space_attr, name_attr)
156 else:
157 if (default_space is not None) and (default_space != ""):
158 self._full = "%s.%s" % (default_space, name_attr)
159 else:
160 self._full = name_attr
161 else:
162 self._full = name_attr
163
164 fullname = property(lambda self: self._full)
165
166 def get_space(self):
167 # type: () -> Optional[Text]
168 """Back out a namespace from full name."""
169 if self._full is None:
170 return None
171
172 if self._full.find(".") > 0:
173 return self._full.rsplit(".", 1)[0]
174 else:
175 return ""
176
177
178 class Names(object):
179 """Track name set and default namespace during parsing."""
180
181 def __init__(self, default_namespace=None):
182 # type: (Optional[Text]) -> None
183 self.names = {} # type: Dict[Text, NamedSchema]
184 self.default_namespace = default_namespace
185
186 def has_name(self, name_attr, space_attr):
187 # type: (Text, Optional[Text]) -> bool
188 test = Name(name_attr, space_attr, self.default_namespace).fullname
189 return test in self.names
190
191 def get_name(self, name_attr, space_attr):
192 # type: (Text, Optional[Text]) -> Optional[NamedSchema]
193 test = Name(name_attr, space_attr, self.default_namespace).fullname
194 if test not in self.names:
195 return None
196 return self.names[test]
197
198 def add_name(self, name_attr, space_attr, new_schema):
199 # type: (Text, Optional[Text], NamedSchema) -> Name
200 """
201 Add a new schema object to the name set.
202
203 @arg name_attr: name value read in schema
204 @arg space_attr: namespace value read in schema.
205
206 @return: the Name that was just added.
207 """
208 to_add = Name(name_attr, space_attr, self.default_namespace)
209
210 if to_add.fullname in VALID_TYPES:
211 fail_msg = "%s is a reserved type name." % to_add.fullname
212 raise SchemaParseException(fail_msg)
213 elif to_add.fullname in self.names:
214 fail_msg = 'The name "%s" is already in use.' % to_add.fullname
215 raise SchemaParseException(fail_msg)
216
217 self.names[to_add.fullname] = new_schema
218 return to_add
219
220
221 class NamedSchema(Schema):
222 """Named Schemas specified in NAMED_TYPES."""
223
224 def __init__(
225 self,
226 atype, # type: Text
227 name, # type: Text
228 namespace=None, # type: Optional[Text]
229 names=None, # type: Optional[Names]
230 other_props=None, # type: Optional[Dict[Text, Text]]
231 ): # type: (...) -> None
232 # Ensure valid ctor args
233 if not name:
234 fail_msg = "Named Schemas must have a non-empty name."
235 raise SchemaParseException(fail_msg)
236 elif not isinstance(name, six.string_types):
237 fail_msg = "The name property must be a string."
238 raise SchemaParseException(fail_msg)
239 elif namespace is not None and not isinstance(namespace, six.string_types):
240 fail_msg = "The namespace property must be a string."
241 raise SchemaParseException(fail_msg)
242 if names is None:
243 raise SchemaParseException("Must provide Names.")
244
245 # Call parent ctor
246 Schema.__init__(self, atype, other_props)
247
248 # Add class members
249 new_name = names.add_name(name, namespace, self)
250
251 # Store name and namespace as they were read in origin schema
252 self.set_prop("name", name)
253 if namespace is not None:
254 self.set_prop("namespace", new_name.get_space())
255
256 # Store full name as calculated from name, namespace
257 self._fullname = new_name.fullname
258
259 # read-only properties
260 name = property(lambda self: self.get_prop("name"))
261
262
263 class Field(object):
264 def __init__(
265 self,
266 atype, # type: Union[Text, Dict[Text, Text]]
267 name, # type: Text
268 has_default, # type: bool
269 default=None, # type: Optional[Text]
270 order=None, # type: Optional[Text]
271 names=None, # type: Optional[Names]
272 doc=None, # type: Optional[Text]
273 other_props=None, # type: Optional[Dict[Text, Text]]
274 ): # type: (...) -> None
275 # Ensure valid ctor args
276 if not name:
277 fail_msg = "Fields must have a non-empty name."
278 raise SchemaParseException(fail_msg)
279 elif not isinstance(name, six.string_types):
280 fail_msg = "The name property must be a string."
281 raise SchemaParseException(fail_msg)
282 elif order is not None and order not in VALID_FIELD_SORT_ORDERS:
283 fail_msg = "The order property %s is not valid." % order
284 raise SchemaParseException(fail_msg)
285
286 # add members
287 self._props = {} # type: Dict[Text, Union[Schema, Text, None]]
288 self._has_default = has_default
289 self._props.update(other_props or {})
290
291 if (
292 isinstance(atype, six.string_types)
293 and names is not None
294 and names.has_name(atype, None)
295 ):
296 type_schema = cast(NamedSchema, names.get_name(atype, None)) # type: Schema
297 else:
298 try:
299 type_schema = make_avsc_object(cast(Dict[Text, Text], atype), names)
300 except Exception as e:
301 raise SchemaParseException(
302 'Type property "%s" not a valid Avro schema: %s' % (atype, e)
303 )
304 self.set_prop("type", type_schema)
305 self.set_prop("name", name)
306 self.type = type_schema
307 self.name = name
308 # TODO(hammer): check to ensure default is valid
309 if has_default:
310 self.set_prop("default", default)
311 if order is not None:
312 self.set_prop("order", order)
313 if doc is not None:
314 self.set_prop("doc", doc)
315
316 # read-only properties
317 default = property(lambda self: self.get_prop("default"))
318
319 # utility functions to manipulate properties dict
320 def get_prop(self, key): # type: (Text) -> Union[Schema, Text, None]
321 return self._props.get(key)
322
323 def set_prop(self, key, value):
324 # type: (Text, Union[Schema, Text, None]) -> None
325 self._props[key] = value
326
327
328 #
329 # Primitive Types
330 #
331 class PrimitiveSchema(Schema):
332 """Valid primitive types are in PRIMITIVE_TYPES."""
333
334 def __init__(self, atype, other_props=None):
335 # type: (Text, Optional[Dict[Text, Text]]) -> None
336 # Ensure valid ctor args
337 if atype not in PRIMITIVE_TYPES:
338 raise AvroException("%s is not a valid primitive type." % atype)
339
340 # Call parent ctor
341 Schema.__init__(self, atype, other_props=other_props)
342
343 self.fullname = atype
344
345
346 #
347 # Complex Types (non-recursive)
348 #
349
350
351 class EnumSchema(NamedSchema):
352 def __init__(
353 self,
354 name, # type: Text
355 namespace, # type: Text
356 symbols, # type: List[Text]
357 names=None, # type: Optional[Names]
358 doc=None, # type: Optional[Text]
359 other_props=None, # type: Optional[Dict[Text, Text]]
360 ): # type: (...) -> None
361 # Ensure valid ctor args
362 if not isinstance(symbols, list):
363 fail_msg = "Enum Schema requires a JSON array for the symbols property."
364 raise AvroException(fail_msg)
365 elif False in [isinstance(s, six.string_types) for s in symbols]:
366 fail_msg = "Enum Schema requires all symbols to be JSON strings."
367 raise AvroException(fail_msg)
368 elif len(set(symbols)) < len(symbols):
369 fail_msg = "Duplicate symbol: %s" % symbols
370 raise AvroException(fail_msg)
371
372 # Call parent ctor
373 NamedSchema.__init__(self, "enum", name, namespace, names, other_props)
374
375 # Add class members
376 self.set_prop("symbols", symbols)
377 if doc is not None:
378 self.set_prop("doc", doc)
379
380 # read-only properties
381 symbols = property(lambda self: self.get_prop("symbols"))
382
383
384 #
385 # Complex Types (recursive)
386 #
387
388
389 class ArraySchema(Schema):
390 def __init__(self, items, names=None, other_props=None):
391 # type: (List[Any], Optional[Names], Optional[Dict[Text, Text]]) -> None
392 # Call parent ctor
393 Schema.__init__(self, "array", other_props)
394 # Add class members
395
396 if names is None:
397 raise SchemaParseException("Must provide Names.")
398 if isinstance(items, six.string_types) and names.has_name(items, None):
399 items_schema = cast(Schema, names.get_name(items, None))
400 else:
401 try:
402 items_schema = make_avsc_object(items, names)
403 except Exception as err:
404 raise SchemaParseException(
405 "Items schema (%s) not a valid Avro schema: %s (known "
406 "names: %s)" % (items, err, list(names.names.keys()))
407 )
408
409 self.set_prop("items", items_schema)
410
411 # read-only properties
412 items = property(lambda self: self.get_prop("items"))
413
414
415 class UnionSchema(Schema):
416 """
417 names is a dictionary of schema objects
418 """
419
420 def __init__(self, schemas, names=None):
421 # type: (List[Schema], Optional[Names]) -> None
422 # Ensure valid ctor args
423 if names is None:
424 raise SchemaParseException("Must provide Names.")
425 if not isinstance(schemas, list):
426 fail_msg = "Union schema requires a list of schemas."
427 raise SchemaParseException(fail_msg)
428
429 # Call parent ctor
430 Schema.__init__(self, "union")
431
432 # Add class members
433 schema_objects = [] # type: List[Schema]
434 for schema in schemas:
435 if isinstance(schema, six.string_types) and names.has_name(schema, None):
436 new_schema = cast(Schema, names.get_name(schema, None))
437 else:
438 try:
439 new_schema = make_avsc_object(schema, names) # type: ignore
440 except Exception as err:
441 raise SchemaParseException(
442 "Union item must be a valid Avro schema: %s" % Text(err)
443 )
444 # check the new schema
445 if (
446 new_schema.type in VALID_TYPES
447 and new_schema.type not in NAMED_TYPES
448 and new_schema.type in [schema.type for schema in schema_objects]
449 ):
450 raise SchemaParseException("%s type already in Union" % new_schema.type)
451 elif new_schema.type == "union":
452 raise SchemaParseException("Unions cannot contain other unions.")
453 else:
454 schema_objects.append(new_schema)
455 self._schemas = schema_objects
456
457 # read-only properties
458 schemas = property(lambda self: self._schemas)
459
460
461 class RecordSchema(NamedSchema):
462 @staticmethod
463 def make_field_objects(field_data, names):
464 # type: (List[Dict[Text, Text]], Names) -> List[Field]
465 """We're going to need to make message parameters too."""
466 field_objects = []
467 field_names = [] # type: List[Text]
468 for field in field_data:
469 if hasattr(field, "get") and callable(field.get):
470 atype = cast(Text, field.get("type"))
471 name = cast(Text, field.get("name"))
472
473 # null values can have a default value of None
474 has_default = False
475 default = None
476 if "default" in field:
477 has_default = True
478 default = field.get("default")
479
480 order = field.get("order")
481 doc = field.get("doc")
482 other_props = get_other_props(field, FIELD_RESERVED_PROPS)
483 new_field = Field(
484 atype, name, has_default, default, order, names, doc, other_props
485 )
486 # make sure field name has not been used yet
487 if new_field.name in field_names:
488 fail_msg = "Field name %s already in use." % new_field.name
489 raise SchemaParseException(fail_msg)
490 field_names.append(new_field.name)
491 else:
492 raise SchemaParseException("Not a valid field: %s" % field)
493 field_objects.append(new_field)
494 return field_objects
495
496 def __init__(
497 self,
498 name, # type: Text
499 namespace, # type: Text
500 fields, # type: List[Dict[Text, Text]]
501 names=None, # type: Optional[Names]
502 schema_type="record", # type: Text
503 doc=None, # type: Optional[Text]
504 other_props=None, # type: Optional[Dict[Text, Text]]
505 ): # type: (...) -> None
506 # Ensure valid ctor args
507 if fields is None:
508 fail_msg = "Record schema requires a non-empty fields property."
509 raise SchemaParseException(fail_msg)
510 elif not isinstance(fields, list):
511 fail_msg = "Fields property must be a list of Avro schemas."
512 raise SchemaParseException(fail_msg)
513 if names is None:
514 raise SchemaParseException("Must provide Names.")
515
516 # Call parent ctor (adds own name to namespace, too)
517 NamedSchema.__init__(self, schema_type, name, namespace, names, other_props)
518
519 if schema_type == "record":
520 old_default = names.default_namespace
521 names.default_namespace = Name(
522 name, namespace, names.default_namespace
523 ).get_space()
524
525 # Add class members
526 field_objects = RecordSchema.make_field_objects(fields, names)
527 self.set_prop("fields", field_objects)
528 if doc is not None:
529 self.set_prop("doc", doc)
530
531 if schema_type == "record":
532 names.default_namespace = old_default
533
534 # read-only properties
535 fields = property(lambda self: self.get_prop("fields"))
536
537
538 #
539 # Module Methods
540 #
541 def get_other_props(all_props, reserved_props):
542 # type: (Dict[Text, Text], Tuple[Text, ...]) -> Optional[Dict[Text, Text]]
543 """
544 Retrieve the non-reserved properties from a dictionary of properties
545 @args reserved_props: The set of reserved properties to exclude
546 """
547 if hasattr(all_props, "items") and callable(all_props.items):
548 return dict(
549 [(k, v) for (k, v) in list(all_props.items()) if k not in reserved_props]
550 )
551 return None
552
553
554 def make_avsc_object(json_data, names=None):
555 # type: (Union[Dict[Text, Text], List[Any], Text], Optional[Names]) -> Schema
556 """
557 Build Avro Schema from data parsed out of JSON string.
558
559 @arg names: A Name object (tracks seen names and default space)
560 """
561 if names is None:
562 names = Names()
563 assert isinstance(names, Names)
564
565 # JSON object (non-union)
566 if hasattr(json_data, "get") and callable(json_data.get): # type: ignore
567 assert isinstance(json_data, Dict)
568 atype = cast(Text, json_data.get("type"))
569 other_props = get_other_props(json_data, SCHEMA_RESERVED_PROPS)
570 if atype in PRIMITIVE_TYPES:
571 return PrimitiveSchema(atype, other_props)
572 if atype in NAMED_TYPES:
573 name = cast(Text, json_data.get("name"))
574 namespace = cast(Text, json_data.get("namespace", names.default_namespace))
575 if atype == "enum":
576 symbols = cast(List[Text], json_data.get("symbols"))
577 doc = json_data.get("doc")
578 return EnumSchema(name, namespace, symbols, names, doc, other_props)
579 if atype in ["record", "error"]:
580 fields = cast(List[Dict[Text, Text]], json_data.get("fields"))
581 doc = json_data.get("doc")
582 return RecordSchema(
583 name, namespace, fields, names, atype, doc, other_props
584 )
585 raise SchemaParseException("Unknown Named Type: %s" % atype)
586 if atype in VALID_TYPES:
587 if atype == "array":
588 items = cast(List[Text], json_data.get("items"))
589 return ArraySchema(items, names, other_props)
590 if atype is None:
591 raise SchemaParseException('No "type" property: %s' % json_data)
592 raise SchemaParseException("Undefined type: %s" % atype)
593 # JSON array (union)
594 if isinstance(json_data, list):
595 return UnionSchema(json_data, names)
596 # JSON string (primitive)
597 if json_data in PRIMITIVE_TYPES:
598 return PrimitiveSchema(cast(Text, json_data))
599 # not for us!
600 fail_msg = "Could not make an Avro Schema object from %s." % json_data
601 raise SchemaParseException(fail_msg)