view env/lib/python3.7/site-packages/schema_salad/python_codegen.py @ 3:758bc20232e8 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:20:52 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Python code generator for a given schema salad definition."""
from typing import IO, Any, Dict, List, MutableMapping, MutableSequence, Union

from pkg_resources import resource_stream
from six import iteritems, itervalues, text_type
from io import StringIO
from typing_extensions import Text  # pylint: disable=unused-import

from . import schema
from .exceptions import SchemaException
from .codegen_base import CodeGenBase, TypeDef
from .schema import shortname

# move to a regular typing import when Python 3.3-3.6 is no longer supported


class PythonCodeGen(CodeGenBase):
    """Generation of Python code for a given Schema Salad definition."""

    def __init__(self, out):
        # type: (IO[Text]) -> None
        super(PythonCodeGen, self).__init__()
        self.out = out
        self.current_class_is_abstract = False
        self.serializer = StringIO()
        self.idfield = u""

    @staticmethod
    def safe_name(name):  # type: (Text) -> Text
        avn = schema.avro_name(name)
        if avn in ("class", "in"):
            # reserved words
            avn = avn + "_"
        return avn

    def prologue(self):
        # type: () -> None

        self.out.write(
            u"""#
# This file was autogenerated using schema-salad-tool --codegen=python
# The code itself is released under the Apache 2.0 license and the help text is
# subject to the license of the original schema.
#
"""
        )

        stream = resource_stream(__name__, "python_codegen_support.py")
        self.out.write(stream.read().decode("UTF-8"))
        stream.close()
        self.out.write(u"\n\n")

        for primative in itervalues(self.prims):
            self.declare_type(primative)

    def begin_class(
        self,  # pylint: disable=too-many-arguments
        classname,  # type: Text
        extends,  # type: MutableSequence[Text]
        doc,  # type: Text
        abstract,  # type: bool
        field_names,  # type: MutableSequence[Text]
        idfield,  # type: Text
    ):  # type: (...) -> None
        classname = self.safe_name(classname)

        if extends:
            ext = ", ".join(self.safe_name(e) for e in extends)
        else:
            ext = "Savable"

        self.out.write(u"class {}({}):\n".format(classname, ext))

        if doc:
            self.out.write(u'    """\n')
            self.out.write(text_type(doc))
            self.out.write(u'\n    """\n')

        self.serializer = StringIO()

        self.current_class_is_abstract = abstract
        if self.current_class_is_abstract:
            self.out.write(u"    pass\n\n\n")
            return

        safe_inits = ["        self,"]  # type: List[Text]
        safe_inits.extend(
            [
                "        {},  # type: Any".format(self.safe_name(f))
                for f in field_names
                if f != "class"
            ]
        )
        self.out.write(
            u"    def __init__(\n"
            + u"\n".join(safe_inits)
            + u"\n        extension_fields=None,  "
            + u"# type: Optional[Dict[Text, Any]]"
            + u"\n        loadingOptions=None  # type: Optional[LoadingOptions]"
            + u"\n    ):  # type: (...) -> None\n"
            + u"""
        if extension_fields:
            self.extension_fields = extension_fields
        else:
            self.extension_fields = yaml.comments.CommentedMap()
        if loadingOptions:
            self.loadingOptions = loadingOptions
        else:
            self.loadingOptions = LoadingOptions()
"""
        )
        field_inits = u""
        for name in field_names:
            if name == "class":
                field_inits += u"""        self.class_ = "{}"
""".format(
                    classname
                )
            else:
                field_inits += u"""        self.{0} = {0}
""".format(
                    self.safe_name(name)
                )
        self.out.write(
            field_inits
            + u"""
    @classmethod
    def fromDoc(cls, doc, baseuri, loadingOptions, docRoot=None):
        # type: (Any, Text, LoadingOptions, Optional[Text]) -> {}

        _doc = copy.copy(doc)
        if hasattr(doc, 'lc'):
            _doc.lc.data = doc.lc.data
            _doc.lc.filename = doc.lc.filename
        _errors__ = []
""".format(
                classname
            )
        )

        self.idfield = idfield

        self.serializer.write(
            u"""
    def save(self, top=False, base_url="", relative_uris=True):
        # type: (bool, Text, bool) -> Dict[Text, Any]
        r = yaml.comments.CommentedMap()  # type: Dict[Text, Any]
        for ef in self.extension_fields:
            r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef]
"""
        )

        if "class" in field_names:
            self.out.write(
                u"""
        if _doc.get('class') != '{class_}':
            raise ValidationException("Not a {class_}")

""".format(
                    class_=classname
                )
            )

            self.serializer.write(
                u"""
        r['class'] = '{class_}'
""".format(
                    class_=classname
                )
            )

    def end_class(self, classname, field_names):
        # type: (Text, List[Text]) -> None

        if self.current_class_is_abstract:
            return

        self.out.write(
            u"""
        extension_fields = yaml.comments.CommentedMap()
        for k in _doc.keys():
            if k not in cls.attrs:
                if ":" in k:
                    ex = expand_url(k,
                                    u"",
                                    loadingOptions,
                                    scoped_id=False,
                                    vocab_term=False)
                    extension_fields[ex] = _doc[k]
                else:
                    _errors__.append(
                        ValidationException(
                            "invalid field `%s`, expected one of: {attrstr}" % (k),
                            SourceLine(_doc, k, str)
                        )
                    )
                    break

        if _errors__:
            raise ValidationException(\"Trying '{class_}'\", None, _errors__)
""".format(
                attrstr=", ".join(["`{}`".format(f) for f in field_names]),
                class_=self.safe_name(classname),
            )
        )

        self.serializer.write(
            u"""
        if top and self.loadingOptions.namespaces:
            r["$namespaces"] = self.loadingOptions.namespaces

"""
        )

        self.serializer.write(u"        return r\n\n")

        self.serializer.write(
            u"    attrs = frozenset({attrs})\n".format(attrs=field_names)
        )

        safe_inits = [
            self.safe_name(f) for f in field_names if f != "class"
        ]  # type: List[Text]

        safe_inits.extend(
            ["extension_fields=extension_fields", "loadingOptions=loadingOptions"]
        )

        self.out.write(
            u"""        loadingOptions = copy.deepcopy(loadingOptions)
        loadingOptions.original_doc = _doc
"""
        )
        self.out.write(u"        return cls(" + ", ".join(safe_inits) + ")\n")

        self.out.write(text_type(self.serializer.getvalue()))

        self.out.write(u"\n\n")

    prims = {
        u"http://www.w3.org/2001/XMLSchema#string": TypeDef(
            "strtype", "_PrimitiveLoader((str, text_type))"
        ),
        u"http://www.w3.org/2001/XMLSchema#int": TypeDef(
            "inttype", "_PrimitiveLoader(int)"
        ),
        u"http://www.w3.org/2001/XMLSchema#long": TypeDef(
            "inttype", "_PrimitiveLoader(int)"
        ),
        u"http://www.w3.org/2001/XMLSchema#float": TypeDef(
            "floattype", "_PrimitiveLoader(float)"
        ),
        u"http://www.w3.org/2001/XMLSchema#double": TypeDef(
            "floattype", "_PrimitiveLoader(float)"
        ),
        u"http://www.w3.org/2001/XMLSchema#boolean": TypeDef(
            "booltype", "_PrimitiveLoader(bool)"
        ),
        u"https://w3id.org/cwl/salad#null": TypeDef(
            "None_type", "_PrimitiveLoader(type(None))"
        ),
        u"https://w3id.org/cwl/salad#Any": TypeDef("Any_type", "_AnyLoader()"),
    }

    def type_loader(self, type_declaration):
        # type: (Union[List[Any], Dict[Text, Any], Text]) -> TypeDef

        if isinstance(type_declaration, MutableSequence):
            sub = [self.type_loader(i) for i in type_declaration]
            return self.declare_type(
                TypeDef(
                    "union_of_{}".format("_or_".join(s.name for s in sub)),
                    "_UnionLoader(({},))".format(", ".join(s.name for s in sub)),
                )
            )
        if isinstance(type_declaration, MutableMapping):
            if type_declaration["type"] in (
                "array",
                "https://w3id.org/cwl/salad#array",
            ):
                i = self.type_loader(type_declaration["items"])
                return self.declare_type(
                    TypeDef(
                        "array_of_{}".format(i.name), "_ArrayLoader({})".format(i.name)
                    )
                )
            if type_declaration["type"] in ("enum", "https://w3id.org/cwl/salad#enum"):
                for sym in type_declaration["symbols"]:
                    self.add_vocab(shortname(sym), sym)
                return self.declare_type(
                    TypeDef(
                        self.safe_name(type_declaration["name"]) + "Loader",
                        '_EnumLoader(("{}",))'.format(
                            '", "'.join(
                                self.safe_name(sym)
                                for sym in type_declaration["symbols"]
                            )
                        ),
                    )
                )
            if type_declaration["type"] in (
                "record",
                "https://w3id.org/cwl/salad#record",
            ):
                return self.declare_type(
                    TypeDef(
                        self.safe_name(type_declaration["name"]) + "Loader",
                        "_RecordLoader({})".format(
                            self.safe_name(type_declaration["name"])
                        ),
                    )
                )
            raise SchemaException("wft {}".format(type_declaration["type"]))
        if type_declaration in self.prims:
            return self.prims[type_declaration]
        return self.collected_types[self.safe_name(type_declaration) + "Loader"]

    def declare_id_field(self, name, fieldtype, doc, optional):
        # type: (Text, TypeDef, Text, bool) -> None

        if self.current_class_is_abstract:
            return

        self.declare_field(name, fieldtype, doc, True)

        if optional:
            opt = """{safename} = "_:" + str(_uuid__.uuid4())""".format(
                safename=self.safe_name(name)
            )
        else:
            opt = """raise ValidationException("Missing {fieldname}")""".format(
                fieldname=shortname(name)
            )

        self.out.write(
            u"""
        if {safename} is None:
            if docRoot is not None:
                {safename} = docRoot
            else:
                {opt}
        baseuri = {safename}
""".format(
                safename=self.safe_name(name), fieldname=shortname(name), opt=opt
            )
        )

    def declare_field(self, name, fieldtype, doc, optional):
        # type: (Text, TypeDef, Text, bool) -> None

        if self.current_class_is_abstract:
            return

        if shortname(name) == "class":
            return

        if optional:
            self.out.write(
                u"        if '{fieldname}' in _doc:\n".format(fieldname=shortname(name))
            )
            spc = "    "
        else:
            spc = ""
        self.out.write(
            u"""{spc}        try:
{spc}            {safename} = load_field(_doc.get(
{spc}                '{fieldname}'), {fieldtype}, baseuri, loadingOptions)
{spc}        except ValidationException as e:
{spc}            _errors__.append(
{spc}                ValidationException(
{spc}                    \"the `{fieldname}` field is not valid because:\",
{spc}                    SourceLine(_doc, '{fieldname}', str),
{spc}                    [e]
{spc}                )
{spc}            )
""".format(
                safename=self.safe_name(name),
                fieldname=shortname(name),
                fieldtype=fieldtype.name,
                spc=spc,
            )
        )
        if optional:
            self.out.write(
                u"""        else:
            {safename} = None
""".format(
                    safename=self.safe_name(name)
                )
            )

        if name == self.idfield or not self.idfield:
            baseurl = "base_url"
        else:
            baseurl = "self.{}".format(self.safe_name(self.idfield))

        if fieldtype.is_uri:
            self.serializer.write(
                u"""
        if self.{safename} is not None:
            u = save_relative_uri(
                self.{safename},
                {baseurl},
                {scoped_id},
                {ref_scope},
                relative_uris)
            if u:
                r['{fieldname}'] = u
""".format(
                    safename=self.safe_name(name),
                    fieldname=shortname(name).strip(),
                    baseurl=baseurl,
                    scoped_id=fieldtype.scoped_id,
                    ref_scope=fieldtype.ref_scope,
                )
            )
        else:
            self.serializer.write(
                u"""
        if self.{safename} is not None:
            r['{fieldname}'] = save(
                self.{safename},
                top=False,
                base_url={baseurl},
                relative_uris=relative_uris)
""".format(
                    safename=self.safe_name(name),
                    fieldname=shortname(name),
                    baseurl=baseurl,
                )
            )

    def uri_loader(self, inner, scoped_id, vocab_term, ref_scope):
        # type: (TypeDef, bool, bool, Union[int, None]) -> TypeDef
        return self.declare_type(
            TypeDef(
                "uri_{}_{}_{}_{}".format(inner.name, scoped_id, vocab_term, ref_scope),
                "_URILoader({}, {}, {}, {})".format(
                    inner.name, scoped_id, vocab_term, ref_scope
                ),
                is_uri=True,
                scoped_id=scoped_id,
                ref_scope=ref_scope,
            )
        )

    def idmap_loader(self, field, inner, map_subject, map_predicate):
        # type: (Text, TypeDef, Text, Union[Text, None]) -> TypeDef
        return self.declare_type(
            TypeDef(
                "idmap_{}_{}".format(self.safe_name(field), inner.name),
                "_IdMapLoader({}, '{}', '{}')".format(
                    inner.name, map_subject, map_predicate
                ),
            )
        )

    def typedsl_loader(self, inner, ref_scope):
        # type: (TypeDef, Union[int, None]) -> TypeDef
        return self.declare_type(
            TypeDef(
                "typedsl_{}_{}".format(inner.name, ref_scope),
                "_TypeDSLLoader({}, {})".format(inner.name, ref_scope),
            )
        )

    def epilogue(self, root_loader):
        # type: (TypeDef) -> None
        self.out.write(u"_vocab = {\n")
        for k in sorted(self.vocab.keys()):
            self.out.write(u'    "{}": "{}",\n'.format(k, self.vocab[k]))
        self.out.write(u"}\n")

        self.out.write(u"_rvocab = {\n")
        for k in sorted(self.vocab.keys()):
            self.out.write(u'    "{}": "{}",\n'.format(self.vocab[k], k))
        self.out.write(u"}\n\n")

        for _, collected_type in iteritems(self.collected_types):
            self.out.write(
                u"{} = {}\n".format(collected_type.name, collected_type.init)
            )
        self.out.write(u"\n")

        self.out.write(
            u"""
def load_document(doc, baseuri=None, loadingOptions=None):
    # type: (Any, Optional[Text], Optional[LoadingOptions]) -> Any
    if baseuri is None:
        baseuri = file_uri(os.getcwd()) + "/"
    if loadingOptions is None:
        loadingOptions = LoadingOptions()
    return _document_load(%(name)s, doc, baseuri, loadingOptions)


def load_document_by_string(string, uri, loadingOptions=None):
    # type: (Any, Text, Optional[LoadingOptions]) -> Any
    result = yaml.round_trip_load(string, preserve_quotes=True)
    add_lc_filename(result, uri)

    if loadingOptions is None:
        loadingOptions = LoadingOptions(fileuri=uri)
    loadingOptions.idx[uri] = result

    return _document_load(%(name)s, result, uri, loadingOptions)
"""
            % dict(name=root_loader.name)
        )