diff env/lib/python3.7/site-packages/schema_salad/schema.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/schema_salad/schema.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,774 +0,0 @@
-"""Functions to process Schema Salad schemas."""
-from __future__ import absolute_import
-
-import copy
-import hashlib
-from typing import (
-    IO,
-    Any,
-    Dict,
-    List,
-    Mapping,
-    MutableMapping,
-    MutableSequence,
-    Optional,
-    Set,
-    Tuple,
-    TypeVar,
-    Union,
-    cast,
-)
-
-from future.utils import raise_from
-from pkg_resources import resource_stream
-from six import iteritems, string_types
-from six.moves import urllib
-from typing_extensions import Text  # pylint: disable=unused-import
-
-from ruamel import yaml
-from ruamel.yaml.comments import CommentedMap, CommentedSeq
-from schema_salad.utils import (
-    add_dictlist,
-    aslist,
-    convert_to_dict,
-    flatten,
-    json_dumps,
-)
-
-from . import _logger, jsonld_context, ref_resolver, validate
-from .exceptions import (
-    ClassValidationException,
-    ValidationException,
-    SchemaSaladException,
-)
-from .avro.schema import Names, SchemaParseException, make_avsc_object
-from .ref_resolver import Loader
-from .sourceline import SourceLine, add_lc_filename, relname
-
-SALAD_FILES = (
-    "metaschema.yml",
-    "metaschema_base.yml",
-    "salad.md",
-    "field_name.yml",
-    "import_include.md",
-    "link_res.yml",
-    "ident_res.yml",
-    "vocab_res.yml",
-    "vocab_res.yml",
-    "field_name_schema.yml",
-    "field_name_src.yml",
-    "field_name_proc.yml",
-    "ident_res_schema.yml",
-    "ident_res_src.yml",
-    "ident_res_proc.yml",
-    "link_res_schema.yml",
-    "link_res_src.yml",
-    "link_res_proc.yml",
-    "vocab_res_schema.yml",
-    "vocab_res_src.yml",
-    "vocab_res_proc.yml",
-    "map_res.yml",
-    "map_res_schema.yml",
-    "map_res_src.yml",
-    "map_res_proc.yml",
-    "typedsl_res.yml",
-    "typedsl_res_schema.yml",
-    "typedsl_res_src.yml",
-    "typedsl_res_proc.yml",
-    "sfdsl_res.yml",
-    "sfdsl_res_schema.yml",
-    "sfdsl_res_src.yml",
-    "sfdsl_res_proc.yml",
-)
-
-saladp = "https://w3id.org/cwl/salad#"
-
-
-def get_metaschema():  # type: () -> Tuple[Names, List[Dict[Text, Any]], Loader]
-    """Instantiate the metaschema."""
-    loader = ref_resolver.Loader(
-        {
-            "Any": saladp + "Any",
-            "ArraySchema": saladp + "ArraySchema",
-            "Array_symbol": saladp + "ArraySchema/type/Array_symbol",
-            "DocType": saladp + "DocType",
-            "Documentation": saladp + "Documentation",
-            "Documentation_symbol": saladp + "Documentation/type/Documentation_symbol",
-            "Documented": saladp + "Documented",
-            "EnumSchema": saladp + "EnumSchema",
-            "Enum_symbol": saladp + "EnumSchema/type/Enum_symbol",
-            "JsonldPredicate": saladp + "JsonldPredicate",
-            "NamedType": saladp + "NamedType",
-            "PrimitiveType": saladp + "PrimitiveType",
-            "RecordField": saladp + "RecordField",
-            "RecordSchema": saladp + "RecordSchema",
-            "Record_symbol": saladp + "RecordSchema/type/Record_symbol",
-            "SaladEnumSchema": saladp + "SaladEnumSchema",
-            "SaladRecordField": saladp + "SaladRecordField",
-            "SaladRecordSchema": saladp + "SaladRecordSchema",
-            "SchemaDefinedType": saladp + "SchemaDefinedType",
-            "SpecializeDef": saladp + "SpecializeDef",
-            "_container": saladp + "JsonldPredicate/_container",
-            "_id": {"@id": saladp + "_id", "@type": "@id", "identity": True},
-            "_type": saladp + "JsonldPredicate/_type",
-            "abstract": saladp + "SaladRecordSchema/abstract",
-            "array": saladp + "array",
-            "boolean": "http://www.w3.org/2001/XMLSchema#boolean",
-            "dct": "http://purl.org/dc/terms/",
-            "default": {"@id": saladp + "default", "noLinkCheck": True},
-            "doc": "rdfs:comment",
-            "docAfter": {"@id": saladp + "docAfter", "@type": "@id"},
-            "docChild": {"@id": saladp + "docChild", "@type": "@id"},
-            "docParent": {"@id": saladp + "docParent", "@type": "@id"},
-            "documentRoot": saladp + "SchemaDefinedType/documentRoot",
-            "documentation": saladp + "documentation",
-            "double": "http://www.w3.org/2001/XMLSchema#double",
-            "enum": saladp + "enum",
-            "extends": {"@id": saladp + "extends", "@type": "@id", "refScope": 1},
-            "fields": {
-                "@id": saladp + "fields",
-                "mapPredicate": "type",
-                "mapSubject": "name",
-            },
-            "float": "http://www.w3.org/2001/XMLSchema#float",
-            "identity": saladp + "JsonldPredicate/identity",
-            "inVocab": saladp + "NamedType/inVocab",
-            "int": "http://www.w3.org/2001/XMLSchema#int",
-            "items": {"@id": saladp + "items", "@type": "@vocab", "refScope": 2},
-            "jsonldPredicate": "sld:jsonldPredicate",
-            "long": "http://www.w3.org/2001/XMLSchema#long",
-            "mapPredicate": saladp + "JsonldPredicate/mapPredicate",
-            "mapSubject": saladp + "JsonldPredicate/mapSubject",
-            "name": "@id",
-            "noLinkCheck": saladp + "JsonldPredicate/noLinkCheck",
-            "null": saladp + "null",
-            "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
-            "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
-            "record": saladp + "record",
-            "refScope": saladp + "JsonldPredicate/refScope",
-            "sld": saladp,
-            "specialize": {
-                "@id": saladp + "specialize",
-                "mapPredicate": "specializeTo",
-                "mapSubject": "specializeFrom",
-            },
-            "specializeFrom": {
-                "@id": saladp + "specializeFrom",
-                "@type": "@id",
-                "refScope": 1,
-            },
-            "specializeTo": {
-                "@id": saladp + "specializeTo",
-                "@type": "@id",
-                "refScope": 1,
-            },
-            "string": "http://www.w3.org/2001/XMLSchema#string",
-            "subscope": saladp + "JsonldPredicate/subscope",
-            "symbols": {"@id": saladp + "symbols", "@type": "@id", "identity": True},
-            "type": {
-                "@id": saladp + "type",
-                "@type": "@vocab",
-                "refScope": 2,
-                "typeDSL": True,
-            },
-            "typeDSL": saladp + "JsonldPredicate/typeDSL",
-            "xsd": "http://www.w3.org/2001/XMLSchema#",
-        }
-    )
-
-    for salad in SALAD_FILES:
-        with resource_stream(__name__, "metaschema/" + salad) as stream:
-            loader.cache["https://w3id.org/cwl/" + salad] = stream.read()
-
-    with resource_stream(__name__, "metaschema/metaschema.yml") as stream:
-        loader.cache["https://w3id.org/cwl/salad"] = stream.read()
-
-    j = yaml.round_trip_load(loader.cache["https://w3id.org/cwl/salad"])
-    add_lc_filename(j, "metaschema.yml")
-    j, _ = loader.resolve_all(j, saladp)
-
-    sch_obj = make_avro(j, loader)
-    try:
-        sch_names = make_avro_schema_from_avro(sch_obj)
-    except SchemaParseException:
-        _logger.error("Metaschema error, avro was:\n%s", json_dumps(sch_obj, indent=4))
-        raise
-    validate_doc(sch_names, j, loader, strict=True)
-    return (sch_names, j, loader)
-
-
-def add_namespaces(metadata, namespaces):
-    # type: (Mapping[Text, Any], MutableMapping[Text, Text]) -> None
-    """Collect the provided namespaces, checking for conflicts."""
-    for key, value in metadata.items():
-        if key not in namespaces:
-            namespaces[key] = value
-        elif namespaces[key] != value:
-            raise ValidationException(
-                "Namespace prefix '{}' has conflicting definitions '{}'"
-                " and '{}'.".format(key, namespaces[key], value)
-            )
-
-
-def collect_namespaces(metadata):
-    # type: (Mapping[Text, Any]) -> Dict[Text, Text]
-    """Walk through the metadata object, collecting namespace declarations."""
-    namespaces = {}  # type: Dict[Text, Text]
-    if "$import_metadata" in metadata:
-        for value in metadata["$import_metadata"].values():
-            add_namespaces(collect_namespaces(value), namespaces)
-    if "$namespaces" in metadata:
-        add_namespaces(metadata["$namespaces"], namespaces)
-    return namespaces
-
-
-schema_type = Tuple[Loader, Union[Names, SchemaParseException], Dict[Text, Any], Loader]
-
-
-def load_schema(
-    schema_ref,  # type: Union[CommentedMap, CommentedSeq, Text]
-    cache=None,  # type: Optional[Dict[Text, Text]]
-):
-    # type: (...) -> schema_type
-    """
-    Load a schema that can be used to validate documents using load_and_validate.
-
-    return: document_loader, avsc_names, schema_metadata, metaschema_loader
-    """
-
-    metaschema_names, _metaschema_doc, metaschema_loader = get_metaschema()
-    if cache is not None:
-        metaschema_loader.cache.update(cache)
-    schema_doc, schema_metadata = metaschema_loader.resolve_ref(schema_ref, "")
-
-    if not isinstance(schema_doc, MutableSequence):
-        raise ValidationException("Schema reference must resolve to a list.")
-
-    validate_doc(metaschema_names, schema_doc, metaschema_loader, True)
-    metactx = schema_metadata.get("@context", {})
-    metactx.update(collect_namespaces(schema_metadata))
-    schema_ctx = jsonld_context.salad_to_jsonld_context(schema_doc, metactx)[0]
-
-    # Create the loader that will be used to load the target document.
-    document_loader = Loader(schema_ctx, cache=cache)
-
-    # Make the Avro validation that will be used to validate the target
-    # document
-    avsc_names = make_avro_schema(schema_doc, document_loader)
-
-    return document_loader, avsc_names, schema_metadata, metaschema_loader
-
-
-def load_and_validate(
-    document_loader,  # type: Loader
-    avsc_names,  # type: Names
-    document,  # type: Union[CommentedMap, Text]
-    strict,  # type: bool
-    strict_foreign_properties=False,  # type: bool
-):
-    # type: (...) -> Tuple[Any, Dict[Text, Any]]
-    """Load a document and validate it with the provided schema.
-
-    return data, metadata
-    """
-    try:
-        if isinstance(document, CommentedMap):
-            data, metadata = document_loader.resolve_all(
-                document,
-                document["id"],
-                checklinks=True,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-        else:
-            data, metadata = document_loader.resolve_ref(
-                document,
-                checklinks=True,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-
-        validate_doc(
-            avsc_names,
-            data,
-            document_loader,
-            strict,
-            strict_foreign_properties=strict_foreign_properties,
-        )
-    except ValidationException as exc:
-        raise_from(ValidationException("", None, [exc]), exc)
-    return data, metadata
-
-
-def validate_doc(
-    schema_names,  # type: Names
-    doc,  # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None]
-    loader,  # type: Loader
-    strict,  # type: bool
-    strict_foreign_properties=False,  # type: bool
-):
-    # type: (...) -> None
-    """Validate a document using the provided schema."""
-    has_root = False
-    for root in schema_names.names.values():
-        if (hasattr(root, "get_prop") and root.get_prop(u"documentRoot")) or (
-            u"documentRoot" in root.props
-        ):
-            has_root = True
-            break
-
-    if not has_root:
-        raise ValidationException("No document roots defined in the schema")
-
-    if isinstance(doc, MutableSequence):
-        vdoc = doc
-    elif isinstance(doc, CommentedMap):
-        vdoc = CommentedSeq([doc])
-        vdoc.lc.add_kv_line_col(0, [doc.lc.line, doc.lc.col])
-        vdoc.lc.filename = doc.lc.filename
-    else:
-        raise ValidationException("Document must be dict or list")
-
-    roots = []
-    for root in schema_names.names.values():
-        if (hasattr(root, "get_prop") and root.get_prop(u"documentRoot")) or (
-            root.props.get(u"documentRoot")
-        ):
-            roots.append(root)
-
-    anyerrors = []
-    for pos, item in enumerate(vdoc):
-        sourceline = SourceLine(vdoc, pos, Text)
-        success = False
-        for root in roots:
-            success = validate.validate_ex(
-                root,
-                item,
-                loader.identifiers,
-                strict,
-                foreign_properties=loader.foreign_properties,
-                raise_ex=False,
-                skip_foreign_properties=loader.skip_schemas,
-                strict_foreign_properties=strict_foreign_properties,
-            )
-            if success:
-                break
-
-        if not success:
-            errors = []  # type: List[SchemaSaladException]
-            for root in roots:
-                if hasattr(root, "get_prop"):
-                    name = root.get_prop(u"name")
-                elif hasattr(root, "name"):
-                    name = root.name
-
-                try:
-                    validate.validate_ex(
-                        root,
-                        item,
-                        loader.identifiers,
-                        strict,
-                        foreign_properties=loader.foreign_properties,
-                        raise_ex=True,
-                        skip_foreign_properties=loader.skip_schemas,
-                        strict_foreign_properties=strict_foreign_properties,
-                    )
-                except ClassValidationException as exc:
-                    errors = [
-                        ClassValidationException(
-                            "tried `{}` but".format(name), sourceline, [exc]
-                        )
-                    ]
-                    break
-                except ValidationException as exc:
-                    errors.append(
-                        ValidationException(
-                            "tried `{}` but".format(name), sourceline, [exc]
-                        )
-                    )
-
-            objerr = u"Invalid"
-            for ident in loader.identifiers:
-                if ident in item:
-                    objerr = u"Object `{}` is not valid because".format(
-                        relname(item[ident])
-                    )
-                    break
-            anyerrors.append(ValidationException(objerr, sourceline, errors, "-"))
-    if anyerrors:
-        raise ValidationException("", None, anyerrors, "*")
-
-
-def get_anon_name(rec):
-    # type: (MutableMapping[Text, Union[Text, Dict[Text, Text]]]) -> Text
-    """Calculate a reproducible name for anonymous types."""
-    if "name" in rec:
-        name = rec["name"]
-        if isinstance(name, Text):
-            return name
-        raise ValidationException(
-            "Expected name field to be a string, was {}".format(name)
-        )
-    anon_name = u""
-    if rec["type"] in ("enum", saladp + "enum"):
-        for sym in rec["symbols"]:
-            anon_name += sym
-        return "enum_" + hashlib.sha1(anon_name.encode("UTF-8")).hexdigest()
-    if rec["type"] in ("record", saladp + "record"):
-        for field in rec["fields"]:
-            if isinstance(field, Mapping):
-                anon_name += field[u"name"]
-            else:
-                raise ValidationException(
-                    "Expected entries in 'fields' to also be maps, was {}.".format(
-                        field
-                    )
-                )
-        return u"record_" + hashlib.sha1(anon_name.encode("UTF-8")).hexdigest()
-    if rec["type"] in ("array", saladp + "array"):
-        return u""
-    raise ValidationException("Expected enum or record, was {}".format(rec["type"]))
-
-
-def replace_type(items, spec, loader, found, find_embeds=True, deepen=True):
-    # type: (Any, Dict[Text, Any], Loader, Set[Text], bool, bool) -> Any
-    """ Go through and replace types in the 'spec' mapping"""
-
-    if isinstance(items, MutableMapping):
-        # recursively check these fields for types to replace
-        if items.get("type") in ("record", "enum") and items.get("name"):
-            if items["name"] in found:
-                return items["name"]
-            found.add(items["name"])
-
-        if not deepen:
-            return items
-
-        items = copy.copy(items)
-        if not items.get("name"):
-            items["name"] = get_anon_name(items)
-        for name in ("type", "items", "fields"):
-            if name in items:
-                items[name] = replace_type(
-                    items[name],
-                    spec,
-                    loader,
-                    found,
-                    find_embeds=find_embeds,
-                    deepen=find_embeds,
-                )
-                if isinstance(items[name], MutableSequence):
-                    items[name] = flatten(items[name])
-
-        return items
-    if isinstance(items, MutableSequence):
-        # recursively transform list
-        return [
-            replace_type(i, spec, loader, found, find_embeds=find_embeds, deepen=deepen)
-            for i in items
-        ]
-    if isinstance(items, string_types):
-        # found a string which is a symbol corresponding to a type.
-        replace_with = None
-        if items in loader.vocab:
-            # If it's a vocabulary term, first expand it to its fully qualified
-            # URI
-            items = loader.vocab[items]
-
-        if items in spec:
-            # Look up in specialization map
-            replace_with = spec[items]
-
-        if replace_with:
-            return replace_type(
-                replace_with, spec, loader, found, find_embeds=find_embeds
-            )
-        found.add(items)
-    return items
-
-
-def avro_name(url):  # type: (Text) -> Text
-    """
-    Turn a URL into an Avro-safe name.
-
-    If the URL has no fragment, return this plain URL.
-
-    Extract either the last part of the URL fragment past the slash, otherwise
-    the whole fragment.
-    """
-    frg = urllib.parse.urldefrag(url)[1]
-    if frg != "":
-        if "/" in frg:
-            return frg[frg.rindex("/") + 1 :]
-        return frg
-    return url
-
-
-Avro = TypeVar("Avro", Dict[Text, Any], List[Any], Text)
-
-
-def make_valid_avro(
-    items,  # type: Avro
-    alltypes,  # type: Dict[Text, Dict[Text, Any]]
-    found,  # type: Set[Text]
-    union=False,  # type: bool
-):  # type: (...) -> Union[Avro, Dict[Text, Text], Text]
-    """Convert our schema to be more avro like."""
-    # Possibly could be integrated into our fork of avro/schema.py?
-    if isinstance(items, MutableMapping):
-        items = copy.copy(items)
-        if items.get("name") and items.get("inVocab", True):
-            items["name"] = avro_name(items["name"])
-
-        if "type" in items and items["type"] in (
-            saladp + "record",
-            saladp + "enum",
-            "record",
-            "enum",
-        ):
-            if (hasattr(items, "get") and items.get("abstract")) or (
-                "abstract" in items
-            ):
-                return items
-            if items["name"] in found:
-                return cast(Text, items["name"])
-            found.add(items["name"])
-        for field in ("type", "items", "values", "fields"):
-            if field in items:
-                items[field] = make_valid_avro(
-                    items[field], alltypes, found, union=True
-                )
-        if "symbols" in items:
-            items["symbols"] = [avro_name(sym) for sym in items["symbols"]]
-        return items
-    if isinstance(items, MutableSequence):
-        ret = []
-        for i in items:
-            ret.append(make_valid_avro(i, alltypes, found, union=union))
-        return ret
-    if union and isinstance(items, string_types):
-        if items in alltypes and avro_name(items) not in found:
-            return cast(
-                Dict[Text, Text],
-                make_valid_avro(alltypes[items], alltypes, found, union=union),
-            )
-        items = avro_name(items)
-    return items
-
-
-def deepcopy_strip(item):  # type: (Any) -> Any
-    """
-    Make a deep copy of list and dict objects.
-
-    Intentionally do not copy attributes.  This is to discard CommentedMap and
-    CommentedSeq metadata which is very expensive with regular copy.deepcopy.
-    """
-
-    if isinstance(item, MutableMapping):
-        return {k: deepcopy_strip(v) for k, v in iteritems(item)}
-    if isinstance(item, MutableSequence):
-        return [deepcopy_strip(k) for k in item]
-    return item
-
-
-def extend_and_specialize(items, loader):
-    # type: (List[Dict[Text, Any]], Loader) -> List[Dict[Text, Any]]
-    """
-    Apply 'extend' and 'specialize' to fully materialize derived record types.
-    """
-
-    items = deepcopy_strip(items)
-    types = {i["name"]: i for i in items}  # type: Dict[Text, Any]
-    results = []
-
-    for stype in items:
-        if "extends" in stype:
-            specs = {}  # type: Dict[Text, Text]
-            if "specialize" in stype:
-                for spec in aslist(stype["specialize"]):
-                    specs[spec["specializeFrom"]] = spec["specializeTo"]
-
-            exfields = []  # type: List[Text]
-            exsym = []  # type: List[Text]
-            for ex in aslist(stype["extends"]):
-                if ex not in types:
-                    raise ValidationException(
-                        "Extends {} in {} refers to invalid base type.".format(
-                            stype["extends"], stype["name"]
-                        )
-                    )
-
-                basetype = copy.copy(types[ex])
-
-                if stype["type"] == "record":
-                    if specs:
-                        basetype["fields"] = replace_type(
-                            basetype.get("fields", []), specs, loader, set()
-                        )
-
-                    for field in basetype.get("fields", []):
-                        if "inherited_from" not in field:
-                            field["inherited_from"] = ex
-
-                    exfields.extend(basetype.get("fields", []))
-                elif stype["type"] == "enum":
-                    exsym.extend(basetype.get("symbols", []))
-
-            if stype["type"] == "record":
-                stype = copy.copy(stype)
-                exfields.extend(stype.get("fields", []))
-                stype["fields"] = exfields
-
-                fieldnames = set()  # type: Set[Text]
-                for field in stype["fields"]:
-                    if field["name"] in fieldnames:
-                        raise ValidationException(
-                            "Field name {} appears twice in {}".format(
-                                field["name"], stype["name"]
-                            )
-                        )
-                    else:
-                        fieldnames.add(field["name"])
-            elif stype["type"] == "enum":
-                stype = copy.copy(stype)
-                exsym.extend(stype.get("symbols", []))
-                stype["symbol"] = exsym
-
-            types[stype["name"]] = stype
-
-        results.append(stype)
-
-    ex_types = {}
-    for result in results:
-        ex_types[result["name"]] = result
-
-    extended_by = {}  # type: Dict[Text, Text]
-    for result in results:
-        if "extends" in result:
-            for ex in aslist(result["extends"]):
-                if ex_types[ex].get("abstract"):
-                    add_dictlist(extended_by, ex, ex_types[result["name"]])
-                    add_dictlist(extended_by, avro_name(ex), ex_types[ex])
-
-    for result in results:
-        if result.get("abstract") and result["name"] not in extended_by:
-            raise ValidationException(
-                "{} is abstract but missing a concrete subtype".format(result["name"])
-            )
-
-    for result in results:
-        if "fields" in result:
-            result["fields"] = replace_type(
-                result["fields"], extended_by, loader, set()
-            )
-
-    return results
-
-
-def make_avro(
-    i,  # type: List[Dict[Text, Any]]
-    loader,  # type: Loader
-):  # type: (...) -> List[Any]
-
-    j = extend_and_specialize(i, loader)
-
-    name_dict = {}  # type: Dict[Text, Dict[Text, Any]]
-    for entry in j:
-        name_dict[entry["name"]] = entry
-    avro = make_valid_avro(j, name_dict, set())
-
-    return [
-        t
-        for t in avro
-        if isinstance(t, MutableMapping)
-        and not t.get("abstract")
-        and t.get("type") != "documentation"
-    ]
-
-
-def make_avro_schema(
-    i,  # type: List[Any]
-    loader,  # type: Loader
-):  # type: (...) -> Names
-    """
-    All in one convenience function.
-
-    Call make_avro() and make_avro_schema_from_avro() separately if you need
-    the intermediate result for diagnostic output.
-    """
-    names = Names()
-    avro = make_avro(i, loader)
-    make_avsc_object(convert_to_dict(avro), names)
-    return names
-
-
-def make_avro_schema_from_avro(avro):
-    # type: (List[Union[Avro, Dict[Text, Text], Text]]) -> Names
-    names = Names()
-    make_avsc_object(convert_to_dict(avro), names)
-    return names
-
-
-def shortname(inputid):  # type: (Text) -> Text
-    """Returns the last segment of the provided fragment or path."""
-    parsed_id = urllib.parse.urlparse(inputid)
-    if parsed_id.fragment:
-        return parsed_id.fragment.split(u"/")[-1]
-    return parsed_id.path.split(u"/")[-1]
-
-
-def print_inheritance(doc, stream):
-    # type: (List[Dict[Text, Any]], IO[Any]) -> None
-    """Write a Grapviz inheritance graph for the supplied document."""
-    stream.write("digraph {\n")
-    for entry in doc:
-        if entry["type"] == "record":
-            label = name = shortname(entry["name"])
-            fields = entry.get("fields", [])
-            if fields:
-                label += "\\n* {}\\l".format(
-                    "\\l* ".join(shortname(field["name"]) for field in fields)
-                )
-            shape = "ellipse" if entry.get("abstract") else "box"
-            stream.write('"{}" [shape={} label="{}"];\n'.format(name, shape, label))
-            if "extends" in entry:
-                for target in aslist(entry["extends"]):
-                    stream.write('"{}" -> "{}";\n'.format(shortname(target), name))
-    stream.write("}\n")
-
-
-def print_fieldrefs(doc, loader, stream):
-    # type: (List[Dict[Text, Any]], Loader, IO[Any]) -> None
-    """Write a GraphViz graph of the relationships between the fields."""
-    obj = extend_and_specialize(doc, loader)
-
-    primitives = set(
-        (
-            "http://www.w3.org/2001/XMLSchema#string",
-            "http://www.w3.org/2001/XMLSchema#boolean",
-            "http://www.w3.org/2001/XMLSchema#int",
-            "http://www.w3.org/2001/XMLSchema#long",
-            saladp + "null",
-            saladp + "enum",
-            saladp + "array",
-            saladp + "record",
-            saladp + "Any",
-        )
-    )
-
-    stream.write("digraph {\n")
-    for entry in obj:
-        if entry.get("abstract"):
-            continue
-        if entry["type"] == "record":
-            label = shortname(entry["name"])
-            for field in entry.get("fields", []):
-                found = set()  # type: Set[Text]
-                field_name = shortname(field["name"])
-                replace_type(field["type"], {}, loader, found, find_embeds=False)
-                for each_type in found:
-                    if each_type not in primitives:
-                        stream.write(
-                            '"{}" -> "{}" [label="{}"];\n'.format(
-                                label, shortname(each_type), field_name
-                            )
-                        )
-    stream.write("}\n")