Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/pack.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/pack.py Thu May 14 14:56:58 2020 -0400 @@ -0,0 +1,223 @@ +"""Reformat a CWL document and all its references to be a single stream.""" +from __future__ import absolute_import + +import copy +from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, + Optional, Set, Union, cast) + +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad.ref_resolver import Loader # pylint: disable=unused-import +from schema_salad.ref_resolver import SubLoader +from schema_salad.sourceline import cmap +from six import iteritems, string_types +from six.moves import urllib +from typing_extensions import Text # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from .process import shortname, uniquename + + +def flatten_deps(d, files): # type: (Any, Set[Text]) -> None + if isinstance(d, MutableSequence): + for s in d: + flatten_deps(s, files) + elif isinstance(d, MutableMapping): + if d["class"] == "File": + files.add(d["location"]) + if "secondaryFiles" in d: + flatten_deps(d["secondaryFiles"], files) + if "listing" in d: + flatten_deps(d["listing"], files) + +LoadRefType = Callable[[Optional[Text], Text], Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None]] + + +def find_run(d, # type: Any + loadref, # type: LoadRefType + runs # type: Set[Text] + ): # type: (...) -> None + if isinstance(d, MutableSequence): + for s in d: + find_run(s, loadref, runs) + elif isinstance(d, MutableMapping): + if "run" in d and isinstance(d["run"], string_types): + if d["run"] not in runs: + runs.add(d["run"]) + find_run(loadref(None, d["run"]), loadref, runs) + for s in d.values(): + find_run(s, loadref, runs) + + +def find_ids(d, ids): # type: (Any, Set[Text]) -> None + if isinstance(d, MutableSequence): + for s in d: + find_ids(s, ids) + elif isinstance(d, MutableMapping): + for i in ("id", "name"): + if i in d and isinstance(d[i], string_types): + ids.add(d[i]) + for s in d.values(): + find_ids(s, ids) + + +def replace_refs(d, rewrite, stem, newstem): + # type: (Any, Dict[Text, Text], Text, Text) -> None + if isinstance(d, MutableSequence): + for s, v in enumerate(d): + if isinstance(v, string_types): + if v in rewrite: + d[s] = rewrite[v] + elif v.startswith(stem): + d[s] = newstem + v[len(stem):] + rewrite[v] = d[s] + else: + replace_refs(v, rewrite, stem, newstem) + elif isinstance(d, MutableMapping): + for s, v in d.items(): + if isinstance(v, string_types): + if v in rewrite: + d[s] = rewrite[v] + elif v.startswith(stem): + id_ = v[len(stem):] + # prevent appending newstems if tool is already packed + if id_.startswith(newstem.strip("#")): + d[s] = "#" + id_ + else: + d[s] = newstem + id_ + rewrite[v] = d[s] + replace_refs(v, rewrite, stem, newstem) + +def import_embed(d, seen): + # type: (Any, Set[Text]) -> None + if isinstance(d, MutableSequence): + for v in d: + import_embed(v, seen) + elif isinstance(d, MutableMapping): + for n in ("id", "name"): + if n in d: + if d[n] in seen: + this = d[n] + d.clear() + d["$import"] = this + else: + this = d[n] + seen.add(this) + break + + for k in sorted(d.keys()): + import_embed(d[k], seen) + + +def pack(document_loader, # type: Loader + processobj, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] + uri, # type: Text + metadata, # type: Dict[Text, Text] + rewrite_out=None # type: Optional[Dict[Text, Text]] + ): # type: (...) -> Dict[Text, Any] + + document_loader = SubLoader(document_loader) + document_loader.idx = {} + if isinstance(processobj, MutableMapping): + document_loader.idx[processobj["id"]] = CommentedMap(iteritems(processobj)) + elif isinstance(processobj, MutableSequence): + _, frag = urllib.parse.urldefrag(uri) + for po in processobj: + if not frag: + if po["id"].endswith("#main"): + uri = po["id"] + document_loader.idx[po["id"]] = CommentedMap(iteritems(po)) + document_loader.idx[metadata["id"]] = CommentedMap(iteritems(metadata)) + + def loadref(base, uri): + # type: (Optional[Text], Text) -> Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None] + return document_loader.resolve_ref(uri, base_url=base)[0] + + ids = set() # type: Set[Text] + find_ids(processobj, ids) + + runs = {uri} + find_run(processobj, loadref, runs) + + for f in runs: + find_ids(document_loader.resolve_ref(f)[0], ids) + + names = set() # type: Set[Text] + if rewrite_out is None: + rewrite = {} # type: Dict[Text, Text] + else: + rewrite = rewrite_out + + mainpath, _ = urllib.parse.urldefrag(uri) + + def rewrite_id(r, mainuri): + # type: (Text, Text) -> None + if r == mainuri: + rewrite[r] = "#main" + elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"): + if r[len(mainuri):].startswith("#main/"): + rewrite[r] = "#" + uniquename(r[len(mainuri)+1:], names) + else: + rewrite[r] = "#" + uniquename("main/"+r[len(mainuri)+1:], names) + else: + path, frag = urllib.parse.urldefrag(r) + if path == mainpath: + rewrite[r] = "#" + uniquename(frag, names) + else: + if path not in rewrite: + rewrite[path] = "#" + uniquename(shortname(path), names) + + sortedids = sorted(ids) + + for r in sortedids: + rewrite_id(r, uri) + + packed = CommentedMap((("$graph", CommentedSeq()), + ("cwlVersion", metadata["cwlVersion"]))) + namespaces = metadata.get('$namespaces', None) + + schemas = set() # type: Set[Text] + if '$schemas' in metadata: + for each_schema in metadata["$schemas"]: + schemas.add(each_schema) + for r in sorted(runs): + dcr, metadata = document_loader.resolve_ref(r) + if isinstance(dcr, CommentedSeq): + dcr = dcr[0] + dcr = cast(CommentedMap, dcr) + if not isinstance(dcr, MutableMapping): + continue + metadata = cast(Dict[Text, Any], metadata) + if "$schemas" in metadata: + for s in metadata["$schemas"]: + schemas.add(s) + if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"): + continue + dc = cast(Dict[Text, Any], copy.deepcopy(dcr)) + v = rewrite[r] + dc["id"] = v + for n in ("name", "cwlVersion", "$namespaces", "$schemas"): + if n in dc: + del dc[n] + packed["$graph"].append(dc) + + if schemas: + packed["$schemas"] = list(schemas) + + for r in list(rewrite.keys()): + v = rewrite[r] + replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/") + + import_embed(packed, set()) + + if len(packed["$graph"]) == 1: + # duplicate 'cwlVersion' and $schemas inside $graph when there is only + # a single item because we will print the contents inside '$graph' + # rather than whole dict + packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"] + if schemas: + packed["$graph"][0]["$schemas"] = list(schemas) + # always include $namespaces in the #main + if namespaces: + packed["$graph"][0]["$namespaces"] = namespaces + + return packed