view env/lib/python3.7/site-packages/cwltool/pack.py @ 3:758bc20232e8 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:20:52 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

"""Reformat a CWL document and all its references to be a single stream."""
from __future__ import absolute_import

import copy
from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
                    Optional, Set, Union, cast)

from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.ref_resolver import Loader  # pylint: disable=unused-import
from schema_salad.ref_resolver import SubLoader
from schema_salad.sourceline import cmap
from six import iteritems, string_types
from six.moves import urllib
from typing_extensions import Text  # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from .process import shortname, uniquename


def flatten_deps(d, files):  # type: (Any, Set[Text]) -> None
    if isinstance(d, MutableSequence):
        for s in d:
            flatten_deps(s, files)
    elif isinstance(d, MutableMapping):
        if d["class"] == "File":
            files.add(d["location"])
        if "secondaryFiles" in d:
            flatten_deps(d["secondaryFiles"], files)
        if "listing" in d:
            flatten_deps(d["listing"], files)

LoadRefType = Callable[[Optional[Text], Text], Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None]]


def find_run(d,        # type: Any
             loadref,  # type: LoadRefType
             runs      # type: Set[Text]
            ):  # type: (...) -> None
    if isinstance(d, MutableSequence):
        for s in d:
            find_run(s, loadref, runs)
    elif isinstance(d, MutableMapping):
        if "run" in d and isinstance(d["run"], string_types):
            if d["run"] not in runs:
                runs.add(d["run"])
                find_run(loadref(None, d["run"]), loadref, runs)
        for s in d.values():
            find_run(s, loadref, runs)


def find_ids(d, ids):  # type: (Any, Set[Text]) -> None
    if isinstance(d, MutableSequence):
        for s in d:
            find_ids(s, ids)
    elif isinstance(d, MutableMapping):
        for i in ("id", "name"):
            if i in d and isinstance(d[i], string_types):
                ids.add(d[i])
        for s in d.values():
            find_ids(s, ids)


def replace_refs(d, rewrite, stem, newstem):
    # type: (Any, Dict[Text, Text], Text, Text) -> None
    if isinstance(d, MutableSequence):
        for s, v in enumerate(d):
            if isinstance(v, string_types):
                if v in rewrite:
                    d[s] = rewrite[v]
                elif v.startswith(stem):
                    d[s] = newstem + v[len(stem):]
                    rewrite[v] = d[s]
            else:
                replace_refs(v, rewrite, stem, newstem)
    elif isinstance(d, MutableMapping):
        for s, v in d.items():
            if isinstance(v, string_types):
                if v in rewrite:
                    d[s] = rewrite[v]
                elif v.startswith(stem):
                    id_ = v[len(stem):]
                    # prevent appending newstems if tool is already packed
                    if id_.startswith(newstem.strip("#")):
                        d[s] = "#" + id_
                    else:
                        d[s] = newstem + id_
                    rewrite[v] = d[s]
            replace_refs(v, rewrite, stem, newstem)

def import_embed(d, seen):
    # type: (Any, Set[Text]) -> None
    if isinstance(d, MutableSequence):
        for v in d:
            import_embed(v, seen)
    elif isinstance(d, MutableMapping):
        for n in ("id", "name"):
            if n in d:
                if d[n] in seen:
                    this = d[n]
                    d.clear()
                    d["$import"] = this
                else:
                    this = d[n]
                    seen.add(this)
                    break

        for k in sorted(d.keys()):
            import_embed(d[k], seen)


def pack(document_loader,  # type: Loader
         processobj,       # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
         uri,              # type: Text
         metadata,         # type: Dict[Text, Text]
         rewrite_out=None  # type: Optional[Dict[Text, Text]]
        ):  # type: (...) -> Dict[Text, Any]

    document_loader = SubLoader(document_loader)
    document_loader.idx = {}
    if isinstance(processobj, MutableMapping):
        document_loader.idx[processobj["id"]] = CommentedMap(iteritems(processobj))
    elif isinstance(processobj, MutableSequence):
        _, frag = urllib.parse.urldefrag(uri)
        for po in processobj:
            if not frag:
                if po["id"].endswith("#main"):
                    uri = po["id"]
            document_loader.idx[po["id"]] = CommentedMap(iteritems(po))
        document_loader.idx[metadata["id"]] = CommentedMap(iteritems(metadata))

    def loadref(base, uri):
        # type: (Optional[Text], Text) -> Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None]
        return document_loader.resolve_ref(uri, base_url=base)[0]

    ids = set()  # type: Set[Text]
    find_ids(processobj, ids)

    runs = {uri}
    find_run(processobj, loadref, runs)

    for f in runs:
        find_ids(document_loader.resolve_ref(f)[0], ids)

    names = set()  # type: Set[Text]
    if rewrite_out is None:
        rewrite = {}  # type: Dict[Text, Text]
    else:
        rewrite = rewrite_out

    mainpath, _ = urllib.parse.urldefrag(uri)

    def rewrite_id(r, mainuri):
        # type: (Text, Text) -> None
        if r == mainuri:
            rewrite[r] = "#main"
        elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"):
            if r[len(mainuri):].startswith("#main/"):
                rewrite[r] = "#" + uniquename(r[len(mainuri)+1:], names)
            else:
                rewrite[r] = "#" + uniquename("main/"+r[len(mainuri)+1:], names)
        else:
            path, frag = urllib.parse.urldefrag(r)
            if path == mainpath:
                rewrite[r] = "#" + uniquename(frag, names)
            else:
                if path not in rewrite:
                    rewrite[path] = "#" + uniquename(shortname(path), names)

    sortedids = sorted(ids)

    for r in sortedids:
        rewrite_id(r, uri)

    packed = CommentedMap((("$graph", CommentedSeq()),
                           ("cwlVersion", metadata["cwlVersion"])))
    namespaces = metadata.get('$namespaces', None)

    schemas = set()  # type: Set[Text]
    if '$schemas' in metadata:
        for each_schema in metadata["$schemas"]:
            schemas.add(each_schema)
    for r in sorted(runs):
        dcr, metadata = document_loader.resolve_ref(r)
        if isinstance(dcr, CommentedSeq):
            dcr = dcr[0]
            dcr = cast(CommentedMap, dcr)
        if not isinstance(dcr, MutableMapping):
            continue
        metadata = cast(Dict[Text, Any], metadata)
        if "$schemas" in metadata:
            for s in metadata["$schemas"]:
                schemas.add(s)
        if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"):
            continue
        dc = cast(Dict[Text, Any], copy.deepcopy(dcr))
        v = rewrite[r]
        dc["id"] = v
        for n in ("name", "cwlVersion", "$namespaces", "$schemas"):
            if n in dc:
                del dc[n]
        packed["$graph"].append(dc)

    if schemas:
        packed["$schemas"] = list(schemas)

    for r in list(rewrite.keys()):
        v = rewrite[r]
        replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/")

    import_embed(packed, set())

    if len(packed["$graph"]) == 1:
        # duplicate 'cwlVersion' and $schemas inside $graph when there is only
        # a single item because we will print the contents inside '$graph'
        # rather than whole dict
        packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"]
        if schemas:
            packed["$graph"][0]["$schemas"] = list(schemas)
    # always include $namespaces in the #main
    if namespaces:
        packed["$graph"][0]["$namespaces"] = namespaces

    return packed