Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/pack.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/pack.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,284 @@ +"""Reformat a CWL document and all its references to be a single stream.""" + +import copy +import urllib +from typing import ( + Any, + Callable, + Dict, + MutableMapping, + MutableSequence, + Optional, + Set, + Union, + cast, +) + +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad.ref_resolver import Loader, SubLoader +from schema_salad.utils import ResolveType + +from .context import LoadingContext +from .load_tool import fetch_document, resolve_and_validate_document +from .process import shortname, uniquename +from .update import ORDERED_VERSIONS, update +from .utils import CWLObjectType, CWLOutputType + +LoadRefType = Callable[[Optional[str], str], ResolveType] + + +def find_run( + d: Union[CWLObjectType, ResolveType], + loadref: LoadRefType, + runs: Set[str], +) -> None: + if isinstance(d, MutableSequence): + for s in d: + find_run(s, loadref, runs) + elif isinstance(d, MutableMapping): + if "run" in d and isinstance(d["run"], str): + if d["run"] not in runs: + runs.add(d["run"]) + find_run(loadref(None, d["run"]), loadref, runs) + for s in d.values(): + find_run(s, loadref, runs) + + +def find_ids( + d: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType], None], + ids: Set[str], +) -> None: + if isinstance(d, MutableSequence): + for s in d: + find_ids(cast(CWLObjectType, s), ids) + elif isinstance(d, MutableMapping): + for i in ("id", "name"): + if i in d and isinstance(d[i], str): + ids.add(cast(str, d[i])) + for s2 in d.values(): + find_ids(cast(CWLOutputType, s2), ids) + + +def replace_refs(d: Any, rewrite: Dict[str, str], stem: str, newstem: str) -> None: + if isinstance(d, MutableSequence): + for s, v in enumerate(d): + if isinstance(v, str): + if v in rewrite: + d[s] = rewrite[v] + elif v.startswith(stem): + d[s] = newstem + v[len(stem) :] + rewrite[v] = d[s] + else: + replace_refs(v, rewrite, stem, newstem) + elif isinstance(d, MutableMapping): + for key, val in d.items(): + if isinstance(val, str): + if val in rewrite: + d[key] = rewrite[val] + elif val.startswith(stem): + id_ = val[len(stem) :] + # prevent appending newstems if tool is already packed + if id_.startswith(newstem.strip("#")): + d[key] = "#" + id_ + else: + d[key] = newstem + id_ + rewrite[val] = d[key] + replace_refs(val, rewrite, stem, newstem) + + +def import_embed( + d: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType], + seen: Set[str], +) -> None: + if isinstance(d, MutableSequence): + for v in d: + import_embed(cast(CWLOutputType, v), seen) + elif isinstance(d, MutableMapping): + for n in ("id", "name"): + if n in d: + if isinstance(d[n], str): + ident = cast(str, d[n]) + if ident in seen: + this = ident + d.clear() + d["$import"] = this + else: + this = ident + seen.add(this) + break + + for k in sorted(d.keys()): + import_embed(cast(CWLOutputType, d[k]), seen) + + +def pack( + loadingContext: LoadingContext, + uri: str, + rewrite_out: Optional[Dict[str, str]] = None, + loader: Optional[Loader] = None, +) -> CWLObjectType: + + # The workflow document we have in memory right now may have been + # updated to the internal CWL version. We need to reload the + # document to go back to its original version. + # + # What's going on here is that the updater replaces the + # documents/fragments in the index with updated ones, the + # index is also used as a cache, so we need to go through the + # loading process with an empty index and updating turned off + # so we have the original un-updated documents. + # + loadingContext = loadingContext.copy() + document_loader = SubLoader(loader or loadingContext.loader or Loader({})) + loadingContext.do_update = False + loadingContext.loader = document_loader + loadingContext.loader.idx = {} + loadingContext.metadata = {} + loadingContext, docobj, uri = fetch_document(uri, loadingContext) + loadingContext, fileuri = resolve_and_validate_document( + loadingContext, docobj, uri, preprocess_only=True + ) + if loadingContext.loader is None: + raise Exception("loadingContext.loader cannot be none") + processobj, metadata = loadingContext.loader.resolve_ref(uri) + document_loader = loadingContext.loader + + if isinstance(processobj, MutableMapping): + document_loader.idx[processobj["id"]] = CommentedMap(processobj.items()) + elif isinstance(processobj, MutableSequence): + _, frag = urllib.parse.urldefrag(uri) + for po in processobj: + if not frag: + if po["id"].endswith("#main"): + uri = po["id"] + document_loader.idx[po["id"]] = CommentedMap(po.items()) + document_loader.idx[metadata["id"]] = CommentedMap(metadata.items()) + + found_versions = { + cast(str, loadingContext.metadata["cwlVersion"]) + } # type: Set[str] + + def loadref(base: Optional[str], lr_uri: str) -> ResolveType: + lr_loadingContext = loadingContext.copy() + lr_loadingContext.metadata = {} + lr_loadingContext, lr_workflowobj, lr_uri = fetch_document( + lr_uri, lr_loadingContext + ) + lr_loadingContext, lr_uri = resolve_and_validate_document( + lr_loadingContext, lr_workflowobj, lr_uri + ) + found_versions.add(cast(str, lr_loadingContext.metadata["cwlVersion"])) + if lr_loadingContext.loader is None: + raise Exception("loader should not be None") + return lr_loadingContext.loader.resolve_ref(lr_uri, base_url=base)[0] + + ids = set() # type: Set[str] + find_ids(processobj, ids) + + runs = {uri} + find_run(processobj, loadref, runs) + + # Figure out the highest version, everything needs to be updated + # to it. + m = 0 + for fv in found_versions: + m = max(m, ORDERED_VERSIONS.index(fv)) + update_to_version = ORDERED_VERSIONS[m] + + for f in runs: + find_ids(document_loader.resolve_ref(f)[0], ids) + + names = set() # type: Set[str] + if rewrite_out is None: + rewrite = {} # type: Dict[str, str] + else: + rewrite = rewrite_out + + mainpath, _ = urllib.parse.urldefrag(uri) + + def rewrite_id(r: str, mainuri: str) -> None: + if r == mainuri: + rewrite[r] = "#main" + elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"): + if r[len(mainuri) :].startswith("#main/"): + rewrite[r] = "#" + uniquename(r[len(mainuri) + 1 :], names) + else: + rewrite[r] = "#" + uniquename("main/" + r[len(mainuri) + 1 :], names) + else: + path, frag = urllib.parse.urldefrag(r) + if path == mainpath: + rewrite[r] = "#" + uniquename(frag, names) + else: + if path not in rewrite: + rewrite[path] = "#" + uniquename(shortname(path), names) + + sortedids = sorted(ids) + + for r in sortedids: + rewrite_id(r, uri) + + packed = CommentedMap( + (("$graph", CommentedSeq()), ("cwlVersion", update_to_version)) + ) + namespaces = metadata.get("$namespaces", None) + + schemas = set() # type: Set[str] + if "$schemas" in metadata: + for each_schema in metadata["$schemas"]: + schemas.add(each_schema) + for r in sorted(runs): + dcr, metadata = document_loader.resolve_ref(r) + if isinstance(dcr, CommentedSeq): + dcr = dcr[0] + dcr = cast(CommentedMap, dcr) + if not isinstance(dcr, MutableMapping): + continue + + dcr = update( + dcr, + document_loader, + r, + loadingContext.enable_dev, + metadata, + update_to_version, + ) + + if "http://commonwl.org/cwltool#original_cwlVersion" in metadata: + del metadata["http://commonwl.org/cwltool#original_cwlVersion"] + if "http://commonwl.org/cwltool#original_cwlVersion" in dcr: + del dcr["http://commonwl.org/cwltool#original_cwlVersion"] + + if "$schemas" in metadata: + for s in metadata["$schemas"]: + schemas.add(s) + if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"): + continue + dc = cast(Dict[str, Any], copy.deepcopy(dcr)) + v = rewrite[r] + dc["id"] = v + for n in ("name", "cwlVersion", "$namespaces", "$schemas"): + if n in dc: + del dc[n] + packed["$graph"].append(dc) + + if schemas: + packed["$schemas"] = list(schemas) + + for r in list(rewrite.keys()): + v = rewrite[r] + replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/") + + import_embed(packed, set()) + + if len(packed["$graph"]) == 1: + # duplicate 'cwlVersion' and $schemas inside $graph when there is only + # a single item because we will print the contents inside '$graph' + # rather than whole dict + packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"] + if schemas: + packed["$graph"][0]["$schemas"] = list(schemas) + # always include $namespaces in the #main + if namespaces: + packed["$graph"][0]["$namespaces"] = namespaces + + return packed