comparison env/lib/python3.7/site-packages/schema_salad/python_codegen.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Python code generator for a given schema salad definition."""
2 from typing import IO, Any, Dict, List, MutableMapping, MutableSequence, Union
3
4 from pkg_resources import resource_stream
5 from six import iteritems, itervalues, text_type
6 from io import StringIO
7 from typing_extensions import Text # pylint: disable=unused-import
8
9 from . import schema
10 from .exceptions import SchemaException
11 from .codegen_base import CodeGenBase, TypeDef
12 from .schema import shortname
13
14 # move to a regular typing import when Python 3.3-3.6 is no longer supported
15
16
17 class PythonCodeGen(CodeGenBase):
18 """Generation of Python code for a given Schema Salad definition."""
19
20 def __init__(self, out):
21 # type: (IO[Text]) -> None
22 super(PythonCodeGen, self).__init__()
23 self.out = out
24 self.current_class_is_abstract = False
25 self.serializer = StringIO()
26 self.idfield = u""
27
28 @staticmethod
29 def safe_name(name): # type: (Text) -> Text
30 avn = schema.avro_name(name)
31 if avn in ("class", "in"):
32 # reserved words
33 avn = avn + "_"
34 return avn
35
36 def prologue(self):
37 # type: () -> None
38
39 self.out.write(
40 u"""#
41 # This file was autogenerated using schema-salad-tool --codegen=python
42 # The code itself is released under the Apache 2.0 license and the help text is
43 # subject to the license of the original schema.
44 #
45 """
46 )
47
48 stream = resource_stream(__name__, "python_codegen_support.py")
49 self.out.write(stream.read().decode("UTF-8"))
50 stream.close()
51 self.out.write(u"\n\n")
52
53 for primative in itervalues(self.prims):
54 self.declare_type(primative)
55
56 def begin_class(
57 self, # pylint: disable=too-many-arguments
58 classname, # type: Text
59 extends, # type: MutableSequence[Text]
60 doc, # type: Text
61 abstract, # type: bool
62 field_names, # type: MutableSequence[Text]
63 idfield, # type: Text
64 ): # type: (...) -> None
65 classname = self.safe_name(classname)
66
67 if extends:
68 ext = ", ".join(self.safe_name(e) for e in extends)
69 else:
70 ext = "Savable"
71
72 self.out.write(u"class {}({}):\n".format(classname, ext))
73
74 if doc:
75 self.out.write(u' """\n')
76 self.out.write(text_type(doc))
77 self.out.write(u'\n """\n')
78
79 self.serializer = StringIO()
80
81 self.current_class_is_abstract = abstract
82 if self.current_class_is_abstract:
83 self.out.write(u" pass\n\n\n")
84 return
85
86 safe_inits = [" self,"] # type: List[Text]
87 safe_inits.extend(
88 [
89 " {}, # type: Any".format(self.safe_name(f))
90 for f in field_names
91 if f != "class"
92 ]
93 )
94 self.out.write(
95 u" def __init__(\n"
96 + u"\n".join(safe_inits)
97 + u"\n extension_fields=None, "
98 + u"# type: Optional[Dict[Text, Any]]"
99 + u"\n loadingOptions=None # type: Optional[LoadingOptions]"
100 + u"\n ): # type: (...) -> None\n"
101 + u"""
102 if extension_fields:
103 self.extension_fields = extension_fields
104 else:
105 self.extension_fields = yaml.comments.CommentedMap()
106 if loadingOptions:
107 self.loadingOptions = loadingOptions
108 else:
109 self.loadingOptions = LoadingOptions()
110 """
111 )
112 field_inits = u""
113 for name in field_names:
114 if name == "class":
115 field_inits += u""" self.class_ = "{}"
116 """.format(
117 classname
118 )
119 else:
120 field_inits += u""" self.{0} = {0}
121 """.format(
122 self.safe_name(name)
123 )
124 self.out.write(
125 field_inits
126 + u"""
127 @classmethod
128 def fromDoc(cls, doc, baseuri, loadingOptions, docRoot=None):
129 # type: (Any, Text, LoadingOptions, Optional[Text]) -> {}
130
131 _doc = copy.copy(doc)
132 if hasattr(doc, 'lc'):
133 _doc.lc.data = doc.lc.data
134 _doc.lc.filename = doc.lc.filename
135 _errors__ = []
136 """.format(
137 classname
138 )
139 )
140
141 self.idfield = idfield
142
143 self.serializer.write(
144 u"""
145 def save(self, top=False, base_url="", relative_uris=True):
146 # type: (bool, Text, bool) -> Dict[Text, Any]
147 r = yaml.comments.CommentedMap() # type: Dict[Text, Any]
148 for ef in self.extension_fields:
149 r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef]
150 """
151 )
152
153 if "class" in field_names:
154 self.out.write(
155 u"""
156 if _doc.get('class') != '{class_}':
157 raise ValidationException("Not a {class_}")
158
159 """.format(
160 class_=classname
161 )
162 )
163
164 self.serializer.write(
165 u"""
166 r['class'] = '{class_}'
167 """.format(
168 class_=classname
169 )
170 )
171
172 def end_class(self, classname, field_names):
173 # type: (Text, List[Text]) -> None
174
175 if self.current_class_is_abstract:
176 return
177
178 self.out.write(
179 u"""
180 extension_fields = yaml.comments.CommentedMap()
181 for k in _doc.keys():
182 if k not in cls.attrs:
183 if ":" in k:
184 ex = expand_url(k,
185 u"",
186 loadingOptions,
187 scoped_id=False,
188 vocab_term=False)
189 extension_fields[ex] = _doc[k]
190 else:
191 _errors__.append(
192 ValidationException(
193 "invalid field `%s`, expected one of: {attrstr}" % (k),
194 SourceLine(_doc, k, str)
195 )
196 )
197 break
198
199 if _errors__:
200 raise ValidationException(\"Trying '{class_}'\", None, _errors__)
201 """.format(
202 attrstr=", ".join(["`{}`".format(f) for f in field_names]),
203 class_=self.safe_name(classname),
204 )
205 )
206
207 self.serializer.write(
208 u"""
209 if top and self.loadingOptions.namespaces:
210 r["$namespaces"] = self.loadingOptions.namespaces
211
212 """
213 )
214
215 self.serializer.write(u" return r\n\n")
216
217 self.serializer.write(
218 u" attrs = frozenset({attrs})\n".format(attrs=field_names)
219 )
220
221 safe_inits = [
222 self.safe_name(f) for f in field_names if f != "class"
223 ] # type: List[Text]
224
225 safe_inits.extend(
226 ["extension_fields=extension_fields", "loadingOptions=loadingOptions"]
227 )
228
229 self.out.write(
230 u""" loadingOptions = copy.deepcopy(loadingOptions)
231 loadingOptions.original_doc = _doc
232 """
233 )
234 self.out.write(u" return cls(" + ", ".join(safe_inits) + ")\n")
235
236 self.out.write(text_type(self.serializer.getvalue()))
237
238 self.out.write(u"\n\n")
239
240 prims = {
241 u"http://www.w3.org/2001/XMLSchema#string": TypeDef(
242 "strtype", "_PrimitiveLoader((str, text_type))"
243 ),
244 u"http://www.w3.org/2001/XMLSchema#int": TypeDef(
245 "inttype", "_PrimitiveLoader(int)"
246 ),
247 u"http://www.w3.org/2001/XMLSchema#long": TypeDef(
248 "inttype", "_PrimitiveLoader(int)"
249 ),
250 u"http://www.w3.org/2001/XMLSchema#float": TypeDef(
251 "floattype", "_PrimitiveLoader(float)"
252 ),
253 u"http://www.w3.org/2001/XMLSchema#double": TypeDef(
254 "floattype", "_PrimitiveLoader(float)"
255 ),
256 u"http://www.w3.org/2001/XMLSchema#boolean": TypeDef(
257 "booltype", "_PrimitiveLoader(bool)"
258 ),
259 u"https://w3id.org/cwl/salad#null": TypeDef(
260 "None_type", "_PrimitiveLoader(type(None))"
261 ),
262 u"https://w3id.org/cwl/salad#Any": TypeDef("Any_type", "_AnyLoader()"),
263 }
264
265 def type_loader(self, type_declaration):
266 # type: (Union[List[Any], Dict[Text, Any], Text]) -> TypeDef
267
268 if isinstance(type_declaration, MutableSequence):
269 sub = [self.type_loader(i) for i in type_declaration]
270 return self.declare_type(
271 TypeDef(
272 "union_of_{}".format("_or_".join(s.name for s in sub)),
273 "_UnionLoader(({},))".format(", ".join(s.name for s in sub)),
274 )
275 )
276 if isinstance(type_declaration, MutableMapping):
277 if type_declaration["type"] in (
278 "array",
279 "https://w3id.org/cwl/salad#array",
280 ):
281 i = self.type_loader(type_declaration["items"])
282 return self.declare_type(
283 TypeDef(
284 "array_of_{}".format(i.name), "_ArrayLoader({})".format(i.name)
285 )
286 )
287 if type_declaration["type"] in ("enum", "https://w3id.org/cwl/salad#enum"):
288 for sym in type_declaration["symbols"]:
289 self.add_vocab(shortname(sym), sym)
290 return self.declare_type(
291 TypeDef(
292 self.safe_name(type_declaration["name"]) + "Loader",
293 '_EnumLoader(("{}",))'.format(
294 '", "'.join(
295 self.safe_name(sym)
296 for sym in type_declaration["symbols"]
297 )
298 ),
299 )
300 )
301 if type_declaration["type"] in (
302 "record",
303 "https://w3id.org/cwl/salad#record",
304 ):
305 return self.declare_type(
306 TypeDef(
307 self.safe_name(type_declaration["name"]) + "Loader",
308 "_RecordLoader({})".format(
309 self.safe_name(type_declaration["name"])
310 ),
311 )
312 )
313 raise SchemaException("wft {}".format(type_declaration["type"]))
314 if type_declaration in self.prims:
315 return self.prims[type_declaration]
316 return self.collected_types[self.safe_name(type_declaration) + "Loader"]
317
318 def declare_id_field(self, name, fieldtype, doc, optional):
319 # type: (Text, TypeDef, Text, bool) -> None
320
321 if self.current_class_is_abstract:
322 return
323
324 self.declare_field(name, fieldtype, doc, True)
325
326 if optional:
327 opt = """{safename} = "_:" + str(_uuid__.uuid4())""".format(
328 safename=self.safe_name(name)
329 )
330 else:
331 opt = """raise ValidationException("Missing {fieldname}")""".format(
332 fieldname=shortname(name)
333 )
334
335 self.out.write(
336 u"""
337 if {safename} is None:
338 if docRoot is not None:
339 {safename} = docRoot
340 else:
341 {opt}
342 baseuri = {safename}
343 """.format(
344 safename=self.safe_name(name), fieldname=shortname(name), opt=opt
345 )
346 )
347
348 def declare_field(self, name, fieldtype, doc, optional):
349 # type: (Text, TypeDef, Text, bool) -> None
350
351 if self.current_class_is_abstract:
352 return
353
354 if shortname(name) == "class":
355 return
356
357 if optional:
358 self.out.write(
359 u" if '{fieldname}' in _doc:\n".format(fieldname=shortname(name))
360 )
361 spc = " "
362 else:
363 spc = ""
364 self.out.write(
365 u"""{spc} try:
366 {spc} {safename} = load_field(_doc.get(
367 {spc} '{fieldname}'), {fieldtype}, baseuri, loadingOptions)
368 {spc} except ValidationException as e:
369 {spc} _errors__.append(
370 {spc} ValidationException(
371 {spc} \"the `{fieldname}` field is not valid because:\",
372 {spc} SourceLine(_doc, '{fieldname}', str),
373 {spc} [e]
374 {spc} )
375 {spc} )
376 """.format(
377 safename=self.safe_name(name),
378 fieldname=shortname(name),
379 fieldtype=fieldtype.name,
380 spc=spc,
381 )
382 )
383 if optional:
384 self.out.write(
385 u""" else:
386 {safename} = None
387 """.format(
388 safename=self.safe_name(name)
389 )
390 )
391
392 if name == self.idfield or not self.idfield:
393 baseurl = "base_url"
394 else:
395 baseurl = "self.{}".format(self.safe_name(self.idfield))
396
397 if fieldtype.is_uri:
398 self.serializer.write(
399 u"""
400 if self.{safename} is not None:
401 u = save_relative_uri(
402 self.{safename},
403 {baseurl},
404 {scoped_id},
405 {ref_scope},
406 relative_uris)
407 if u:
408 r['{fieldname}'] = u
409 """.format(
410 safename=self.safe_name(name),
411 fieldname=shortname(name).strip(),
412 baseurl=baseurl,
413 scoped_id=fieldtype.scoped_id,
414 ref_scope=fieldtype.ref_scope,
415 )
416 )
417 else:
418 self.serializer.write(
419 u"""
420 if self.{safename} is not None:
421 r['{fieldname}'] = save(
422 self.{safename},
423 top=False,
424 base_url={baseurl},
425 relative_uris=relative_uris)
426 """.format(
427 safename=self.safe_name(name),
428 fieldname=shortname(name),
429 baseurl=baseurl,
430 )
431 )
432
433 def uri_loader(self, inner, scoped_id, vocab_term, ref_scope):
434 # type: (TypeDef, bool, bool, Union[int, None]) -> TypeDef
435 return self.declare_type(
436 TypeDef(
437 "uri_{}_{}_{}_{}".format(inner.name, scoped_id, vocab_term, ref_scope),
438 "_URILoader({}, {}, {}, {})".format(
439 inner.name, scoped_id, vocab_term, ref_scope
440 ),
441 is_uri=True,
442 scoped_id=scoped_id,
443 ref_scope=ref_scope,
444 )
445 )
446
447 def idmap_loader(self, field, inner, map_subject, map_predicate):
448 # type: (Text, TypeDef, Text, Union[Text, None]) -> TypeDef
449 return self.declare_type(
450 TypeDef(
451 "idmap_{}_{}".format(self.safe_name(field), inner.name),
452 "_IdMapLoader({}, '{}', '{}')".format(
453 inner.name, map_subject, map_predicate
454 ),
455 )
456 )
457
458 def typedsl_loader(self, inner, ref_scope):
459 # type: (TypeDef, Union[int, None]) -> TypeDef
460 return self.declare_type(
461 TypeDef(
462 "typedsl_{}_{}".format(inner.name, ref_scope),
463 "_TypeDSLLoader({}, {})".format(inner.name, ref_scope),
464 )
465 )
466
467 def epilogue(self, root_loader):
468 # type: (TypeDef) -> None
469 self.out.write(u"_vocab = {\n")
470 for k in sorted(self.vocab.keys()):
471 self.out.write(u' "{}": "{}",\n'.format(k, self.vocab[k]))
472 self.out.write(u"}\n")
473
474 self.out.write(u"_rvocab = {\n")
475 for k in sorted(self.vocab.keys()):
476 self.out.write(u' "{}": "{}",\n'.format(self.vocab[k], k))
477 self.out.write(u"}\n\n")
478
479 for _, collected_type in iteritems(self.collected_types):
480 self.out.write(
481 u"{} = {}\n".format(collected_type.name, collected_type.init)
482 )
483 self.out.write(u"\n")
484
485 self.out.write(
486 u"""
487 def load_document(doc, baseuri=None, loadingOptions=None):
488 # type: (Any, Optional[Text], Optional[LoadingOptions]) -> Any
489 if baseuri is None:
490 baseuri = file_uri(os.getcwd()) + "/"
491 if loadingOptions is None:
492 loadingOptions = LoadingOptions()
493 return _document_load(%(name)s, doc, baseuri, loadingOptions)
494
495
496 def load_document_by_string(string, uri, loadingOptions=None):
497 # type: (Any, Text, Optional[LoadingOptions]) -> Any
498 result = yaml.round_trip_load(string, preserve_quotes=True)
499 add_lc_filename(result, uri)
500
501 if loadingOptions is None:
502 loadingOptions = LoadingOptions(fileuri=uri)
503 loadingOptions.idx[uri] = result
504
505 return _document_load(%(name)s, result, uri, loadingOptions)
506 """
507 % dict(name=root_loader.name)
508 )