Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/schema_salad/python_codegen.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Python code generator for a given schema salad definition.""" | |
2 from typing import IO, Any, Dict, List, MutableMapping, MutableSequence, Union | |
3 | |
4 from pkg_resources import resource_stream | |
5 from six import iteritems, itervalues, text_type | |
6 from io import StringIO | |
7 from typing_extensions import Text # pylint: disable=unused-import | |
8 | |
9 from . import schema | |
10 from .exceptions import SchemaException | |
11 from .codegen_base import CodeGenBase, TypeDef | |
12 from .schema import shortname | |
13 | |
14 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
15 | |
16 | |
17 class PythonCodeGen(CodeGenBase): | |
18 """Generation of Python code for a given Schema Salad definition.""" | |
19 | |
20 def __init__(self, out): | |
21 # type: (IO[Text]) -> None | |
22 super(PythonCodeGen, self).__init__() | |
23 self.out = out | |
24 self.current_class_is_abstract = False | |
25 self.serializer = StringIO() | |
26 self.idfield = u"" | |
27 | |
28 @staticmethod | |
29 def safe_name(name): # type: (Text) -> Text | |
30 avn = schema.avro_name(name) | |
31 if avn in ("class", "in"): | |
32 # reserved words | |
33 avn = avn + "_" | |
34 return avn | |
35 | |
36 def prologue(self): | |
37 # type: () -> None | |
38 | |
39 self.out.write( | |
40 u"""# | |
41 # This file was autogenerated using schema-salad-tool --codegen=python | |
42 # The code itself is released under the Apache 2.0 license and the help text is | |
43 # subject to the license of the original schema. | |
44 # | |
45 """ | |
46 ) | |
47 | |
48 stream = resource_stream(__name__, "python_codegen_support.py") | |
49 self.out.write(stream.read().decode("UTF-8")) | |
50 stream.close() | |
51 self.out.write(u"\n\n") | |
52 | |
53 for primative in itervalues(self.prims): | |
54 self.declare_type(primative) | |
55 | |
56 def begin_class( | |
57 self, # pylint: disable=too-many-arguments | |
58 classname, # type: Text | |
59 extends, # type: MutableSequence[Text] | |
60 doc, # type: Text | |
61 abstract, # type: bool | |
62 field_names, # type: MutableSequence[Text] | |
63 idfield, # type: Text | |
64 ): # type: (...) -> None | |
65 classname = self.safe_name(classname) | |
66 | |
67 if extends: | |
68 ext = ", ".join(self.safe_name(e) for e in extends) | |
69 else: | |
70 ext = "Savable" | |
71 | |
72 self.out.write(u"class {}({}):\n".format(classname, ext)) | |
73 | |
74 if doc: | |
75 self.out.write(u' """\n') | |
76 self.out.write(text_type(doc)) | |
77 self.out.write(u'\n """\n') | |
78 | |
79 self.serializer = StringIO() | |
80 | |
81 self.current_class_is_abstract = abstract | |
82 if self.current_class_is_abstract: | |
83 self.out.write(u" pass\n\n\n") | |
84 return | |
85 | |
86 safe_inits = [" self,"] # type: List[Text] | |
87 safe_inits.extend( | |
88 [ | |
89 " {}, # type: Any".format(self.safe_name(f)) | |
90 for f in field_names | |
91 if f != "class" | |
92 ] | |
93 ) | |
94 self.out.write( | |
95 u" def __init__(\n" | |
96 + u"\n".join(safe_inits) | |
97 + u"\n extension_fields=None, " | |
98 + u"# type: Optional[Dict[Text, Any]]" | |
99 + u"\n loadingOptions=None # type: Optional[LoadingOptions]" | |
100 + u"\n ): # type: (...) -> None\n" | |
101 + u""" | |
102 if extension_fields: | |
103 self.extension_fields = extension_fields | |
104 else: | |
105 self.extension_fields = yaml.comments.CommentedMap() | |
106 if loadingOptions: | |
107 self.loadingOptions = loadingOptions | |
108 else: | |
109 self.loadingOptions = LoadingOptions() | |
110 """ | |
111 ) | |
112 field_inits = u"" | |
113 for name in field_names: | |
114 if name == "class": | |
115 field_inits += u""" self.class_ = "{}" | |
116 """.format( | |
117 classname | |
118 ) | |
119 else: | |
120 field_inits += u""" self.{0} = {0} | |
121 """.format( | |
122 self.safe_name(name) | |
123 ) | |
124 self.out.write( | |
125 field_inits | |
126 + u""" | |
127 @classmethod | |
128 def fromDoc(cls, doc, baseuri, loadingOptions, docRoot=None): | |
129 # type: (Any, Text, LoadingOptions, Optional[Text]) -> {} | |
130 | |
131 _doc = copy.copy(doc) | |
132 if hasattr(doc, 'lc'): | |
133 _doc.lc.data = doc.lc.data | |
134 _doc.lc.filename = doc.lc.filename | |
135 _errors__ = [] | |
136 """.format( | |
137 classname | |
138 ) | |
139 ) | |
140 | |
141 self.idfield = idfield | |
142 | |
143 self.serializer.write( | |
144 u""" | |
145 def save(self, top=False, base_url="", relative_uris=True): | |
146 # type: (bool, Text, bool) -> Dict[Text, Any] | |
147 r = yaml.comments.CommentedMap() # type: Dict[Text, Any] | |
148 for ef in self.extension_fields: | |
149 r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] | |
150 """ | |
151 ) | |
152 | |
153 if "class" in field_names: | |
154 self.out.write( | |
155 u""" | |
156 if _doc.get('class') != '{class_}': | |
157 raise ValidationException("Not a {class_}") | |
158 | |
159 """.format( | |
160 class_=classname | |
161 ) | |
162 ) | |
163 | |
164 self.serializer.write( | |
165 u""" | |
166 r['class'] = '{class_}' | |
167 """.format( | |
168 class_=classname | |
169 ) | |
170 ) | |
171 | |
172 def end_class(self, classname, field_names): | |
173 # type: (Text, List[Text]) -> None | |
174 | |
175 if self.current_class_is_abstract: | |
176 return | |
177 | |
178 self.out.write( | |
179 u""" | |
180 extension_fields = yaml.comments.CommentedMap() | |
181 for k in _doc.keys(): | |
182 if k not in cls.attrs: | |
183 if ":" in k: | |
184 ex = expand_url(k, | |
185 u"", | |
186 loadingOptions, | |
187 scoped_id=False, | |
188 vocab_term=False) | |
189 extension_fields[ex] = _doc[k] | |
190 else: | |
191 _errors__.append( | |
192 ValidationException( | |
193 "invalid field `%s`, expected one of: {attrstr}" % (k), | |
194 SourceLine(_doc, k, str) | |
195 ) | |
196 ) | |
197 break | |
198 | |
199 if _errors__: | |
200 raise ValidationException(\"Trying '{class_}'\", None, _errors__) | |
201 """.format( | |
202 attrstr=", ".join(["`{}`".format(f) for f in field_names]), | |
203 class_=self.safe_name(classname), | |
204 ) | |
205 ) | |
206 | |
207 self.serializer.write( | |
208 u""" | |
209 if top and self.loadingOptions.namespaces: | |
210 r["$namespaces"] = self.loadingOptions.namespaces | |
211 | |
212 """ | |
213 ) | |
214 | |
215 self.serializer.write(u" return r\n\n") | |
216 | |
217 self.serializer.write( | |
218 u" attrs = frozenset({attrs})\n".format(attrs=field_names) | |
219 ) | |
220 | |
221 safe_inits = [ | |
222 self.safe_name(f) for f in field_names if f != "class" | |
223 ] # type: List[Text] | |
224 | |
225 safe_inits.extend( | |
226 ["extension_fields=extension_fields", "loadingOptions=loadingOptions"] | |
227 ) | |
228 | |
229 self.out.write( | |
230 u""" loadingOptions = copy.deepcopy(loadingOptions) | |
231 loadingOptions.original_doc = _doc | |
232 """ | |
233 ) | |
234 self.out.write(u" return cls(" + ", ".join(safe_inits) + ")\n") | |
235 | |
236 self.out.write(text_type(self.serializer.getvalue())) | |
237 | |
238 self.out.write(u"\n\n") | |
239 | |
240 prims = { | |
241 u"http://www.w3.org/2001/XMLSchema#string": TypeDef( | |
242 "strtype", "_PrimitiveLoader((str, text_type))" | |
243 ), | |
244 u"http://www.w3.org/2001/XMLSchema#int": TypeDef( | |
245 "inttype", "_PrimitiveLoader(int)" | |
246 ), | |
247 u"http://www.w3.org/2001/XMLSchema#long": TypeDef( | |
248 "inttype", "_PrimitiveLoader(int)" | |
249 ), | |
250 u"http://www.w3.org/2001/XMLSchema#float": TypeDef( | |
251 "floattype", "_PrimitiveLoader(float)" | |
252 ), | |
253 u"http://www.w3.org/2001/XMLSchema#double": TypeDef( | |
254 "floattype", "_PrimitiveLoader(float)" | |
255 ), | |
256 u"http://www.w3.org/2001/XMLSchema#boolean": TypeDef( | |
257 "booltype", "_PrimitiveLoader(bool)" | |
258 ), | |
259 u"https://w3id.org/cwl/salad#null": TypeDef( | |
260 "None_type", "_PrimitiveLoader(type(None))" | |
261 ), | |
262 u"https://w3id.org/cwl/salad#Any": TypeDef("Any_type", "_AnyLoader()"), | |
263 } | |
264 | |
265 def type_loader(self, type_declaration): | |
266 # type: (Union[List[Any], Dict[Text, Any], Text]) -> TypeDef | |
267 | |
268 if isinstance(type_declaration, MutableSequence): | |
269 sub = [self.type_loader(i) for i in type_declaration] | |
270 return self.declare_type( | |
271 TypeDef( | |
272 "union_of_{}".format("_or_".join(s.name for s in sub)), | |
273 "_UnionLoader(({},))".format(", ".join(s.name for s in sub)), | |
274 ) | |
275 ) | |
276 if isinstance(type_declaration, MutableMapping): | |
277 if type_declaration["type"] in ( | |
278 "array", | |
279 "https://w3id.org/cwl/salad#array", | |
280 ): | |
281 i = self.type_loader(type_declaration["items"]) | |
282 return self.declare_type( | |
283 TypeDef( | |
284 "array_of_{}".format(i.name), "_ArrayLoader({})".format(i.name) | |
285 ) | |
286 ) | |
287 if type_declaration["type"] in ("enum", "https://w3id.org/cwl/salad#enum"): | |
288 for sym in type_declaration["symbols"]: | |
289 self.add_vocab(shortname(sym), sym) | |
290 return self.declare_type( | |
291 TypeDef( | |
292 self.safe_name(type_declaration["name"]) + "Loader", | |
293 '_EnumLoader(("{}",))'.format( | |
294 '", "'.join( | |
295 self.safe_name(sym) | |
296 for sym in type_declaration["symbols"] | |
297 ) | |
298 ), | |
299 ) | |
300 ) | |
301 if type_declaration["type"] in ( | |
302 "record", | |
303 "https://w3id.org/cwl/salad#record", | |
304 ): | |
305 return self.declare_type( | |
306 TypeDef( | |
307 self.safe_name(type_declaration["name"]) + "Loader", | |
308 "_RecordLoader({})".format( | |
309 self.safe_name(type_declaration["name"]) | |
310 ), | |
311 ) | |
312 ) | |
313 raise SchemaException("wft {}".format(type_declaration["type"])) | |
314 if type_declaration in self.prims: | |
315 return self.prims[type_declaration] | |
316 return self.collected_types[self.safe_name(type_declaration) + "Loader"] | |
317 | |
318 def declare_id_field(self, name, fieldtype, doc, optional): | |
319 # type: (Text, TypeDef, Text, bool) -> None | |
320 | |
321 if self.current_class_is_abstract: | |
322 return | |
323 | |
324 self.declare_field(name, fieldtype, doc, True) | |
325 | |
326 if optional: | |
327 opt = """{safename} = "_:" + str(_uuid__.uuid4())""".format( | |
328 safename=self.safe_name(name) | |
329 ) | |
330 else: | |
331 opt = """raise ValidationException("Missing {fieldname}")""".format( | |
332 fieldname=shortname(name) | |
333 ) | |
334 | |
335 self.out.write( | |
336 u""" | |
337 if {safename} is None: | |
338 if docRoot is not None: | |
339 {safename} = docRoot | |
340 else: | |
341 {opt} | |
342 baseuri = {safename} | |
343 """.format( | |
344 safename=self.safe_name(name), fieldname=shortname(name), opt=opt | |
345 ) | |
346 ) | |
347 | |
348 def declare_field(self, name, fieldtype, doc, optional): | |
349 # type: (Text, TypeDef, Text, bool) -> None | |
350 | |
351 if self.current_class_is_abstract: | |
352 return | |
353 | |
354 if shortname(name) == "class": | |
355 return | |
356 | |
357 if optional: | |
358 self.out.write( | |
359 u" if '{fieldname}' in _doc:\n".format(fieldname=shortname(name)) | |
360 ) | |
361 spc = " " | |
362 else: | |
363 spc = "" | |
364 self.out.write( | |
365 u"""{spc} try: | |
366 {spc} {safename} = load_field(_doc.get( | |
367 {spc} '{fieldname}'), {fieldtype}, baseuri, loadingOptions) | |
368 {spc} except ValidationException as e: | |
369 {spc} _errors__.append( | |
370 {spc} ValidationException( | |
371 {spc} \"the `{fieldname}` field is not valid because:\", | |
372 {spc} SourceLine(_doc, '{fieldname}', str), | |
373 {spc} [e] | |
374 {spc} ) | |
375 {spc} ) | |
376 """.format( | |
377 safename=self.safe_name(name), | |
378 fieldname=shortname(name), | |
379 fieldtype=fieldtype.name, | |
380 spc=spc, | |
381 ) | |
382 ) | |
383 if optional: | |
384 self.out.write( | |
385 u""" else: | |
386 {safename} = None | |
387 """.format( | |
388 safename=self.safe_name(name) | |
389 ) | |
390 ) | |
391 | |
392 if name == self.idfield or not self.idfield: | |
393 baseurl = "base_url" | |
394 else: | |
395 baseurl = "self.{}".format(self.safe_name(self.idfield)) | |
396 | |
397 if fieldtype.is_uri: | |
398 self.serializer.write( | |
399 u""" | |
400 if self.{safename} is not None: | |
401 u = save_relative_uri( | |
402 self.{safename}, | |
403 {baseurl}, | |
404 {scoped_id}, | |
405 {ref_scope}, | |
406 relative_uris) | |
407 if u: | |
408 r['{fieldname}'] = u | |
409 """.format( | |
410 safename=self.safe_name(name), | |
411 fieldname=shortname(name).strip(), | |
412 baseurl=baseurl, | |
413 scoped_id=fieldtype.scoped_id, | |
414 ref_scope=fieldtype.ref_scope, | |
415 ) | |
416 ) | |
417 else: | |
418 self.serializer.write( | |
419 u""" | |
420 if self.{safename} is not None: | |
421 r['{fieldname}'] = save( | |
422 self.{safename}, | |
423 top=False, | |
424 base_url={baseurl}, | |
425 relative_uris=relative_uris) | |
426 """.format( | |
427 safename=self.safe_name(name), | |
428 fieldname=shortname(name), | |
429 baseurl=baseurl, | |
430 ) | |
431 ) | |
432 | |
433 def uri_loader(self, inner, scoped_id, vocab_term, ref_scope): | |
434 # type: (TypeDef, bool, bool, Union[int, None]) -> TypeDef | |
435 return self.declare_type( | |
436 TypeDef( | |
437 "uri_{}_{}_{}_{}".format(inner.name, scoped_id, vocab_term, ref_scope), | |
438 "_URILoader({}, {}, {}, {})".format( | |
439 inner.name, scoped_id, vocab_term, ref_scope | |
440 ), | |
441 is_uri=True, | |
442 scoped_id=scoped_id, | |
443 ref_scope=ref_scope, | |
444 ) | |
445 ) | |
446 | |
447 def idmap_loader(self, field, inner, map_subject, map_predicate): | |
448 # type: (Text, TypeDef, Text, Union[Text, None]) -> TypeDef | |
449 return self.declare_type( | |
450 TypeDef( | |
451 "idmap_{}_{}".format(self.safe_name(field), inner.name), | |
452 "_IdMapLoader({}, '{}', '{}')".format( | |
453 inner.name, map_subject, map_predicate | |
454 ), | |
455 ) | |
456 ) | |
457 | |
458 def typedsl_loader(self, inner, ref_scope): | |
459 # type: (TypeDef, Union[int, None]) -> TypeDef | |
460 return self.declare_type( | |
461 TypeDef( | |
462 "typedsl_{}_{}".format(inner.name, ref_scope), | |
463 "_TypeDSLLoader({}, {})".format(inner.name, ref_scope), | |
464 ) | |
465 ) | |
466 | |
467 def epilogue(self, root_loader): | |
468 # type: (TypeDef) -> None | |
469 self.out.write(u"_vocab = {\n") | |
470 for k in sorted(self.vocab.keys()): | |
471 self.out.write(u' "{}": "{}",\n'.format(k, self.vocab[k])) | |
472 self.out.write(u"}\n") | |
473 | |
474 self.out.write(u"_rvocab = {\n") | |
475 for k in sorted(self.vocab.keys()): | |
476 self.out.write(u' "{}": "{}",\n'.format(self.vocab[k], k)) | |
477 self.out.write(u"}\n\n") | |
478 | |
479 for _, collected_type in iteritems(self.collected_types): | |
480 self.out.write( | |
481 u"{} = {}\n".format(collected_type.name, collected_type.init) | |
482 ) | |
483 self.out.write(u"\n") | |
484 | |
485 self.out.write( | |
486 u""" | |
487 def load_document(doc, baseuri=None, loadingOptions=None): | |
488 # type: (Any, Optional[Text], Optional[LoadingOptions]) -> Any | |
489 if baseuri is None: | |
490 baseuri = file_uri(os.getcwd()) + "/" | |
491 if loadingOptions is None: | |
492 loadingOptions = LoadingOptions() | |
493 return _document_load(%(name)s, doc, baseuri, loadingOptions) | |
494 | |
495 | |
496 def load_document_by_string(string, uri, loadingOptions=None): | |
497 # type: (Any, Text, Optional[LoadingOptions]) -> Any | |
498 result = yaml.round_trip_load(string, preserve_quotes=True) | |
499 add_lc_filename(result, uri) | |
500 | |
501 if loadingOptions is None: | |
502 loadingOptions = LoadingOptions(fileuri=uri) | |
503 loadingOptions.idx[uri] = result | |
504 | |
505 return _document_load(%(name)s, result, uri, loadingOptions) | |
506 """ | |
507 % dict(name=root_loader.name) | |
508 ) |