Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/pack.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Reformat a CWL document and all its references to be a single stream.""" | |
2 | |
3 import copy | |
4 import urllib | |
5 from typing import ( | |
6 Any, | |
7 Callable, | |
8 Dict, | |
9 MutableMapping, | |
10 MutableSequence, | |
11 Optional, | |
12 Set, | |
13 Union, | |
14 cast, | |
15 ) | |
16 | |
17 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
18 from schema_salad.ref_resolver import Loader, SubLoader | |
19 from schema_salad.utils import ResolveType | |
20 | |
21 from .context import LoadingContext | |
22 from .load_tool import fetch_document, resolve_and_validate_document | |
23 from .process import shortname, uniquename | |
24 from .update import ORDERED_VERSIONS, update | |
25 from .utils import CWLObjectType, CWLOutputType | |
26 | |
27 LoadRefType = Callable[[Optional[str], str], ResolveType] | |
28 | |
29 | |
30 def find_run( | |
31 d: Union[CWLObjectType, ResolveType], | |
32 loadref: LoadRefType, | |
33 runs: Set[str], | |
34 ) -> None: | |
35 if isinstance(d, MutableSequence): | |
36 for s in d: | |
37 find_run(s, loadref, runs) | |
38 elif isinstance(d, MutableMapping): | |
39 if "run" in d and isinstance(d["run"], str): | |
40 if d["run"] not in runs: | |
41 runs.add(d["run"]) | |
42 find_run(loadref(None, d["run"]), loadref, runs) | |
43 for s in d.values(): | |
44 find_run(s, loadref, runs) | |
45 | |
46 | |
47 def find_ids( | |
48 d: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType], None], | |
49 ids: Set[str], | |
50 ) -> None: | |
51 if isinstance(d, MutableSequence): | |
52 for s in d: | |
53 find_ids(cast(CWLObjectType, s), ids) | |
54 elif isinstance(d, MutableMapping): | |
55 for i in ("id", "name"): | |
56 if i in d and isinstance(d[i], str): | |
57 ids.add(cast(str, d[i])) | |
58 for s2 in d.values(): | |
59 find_ids(cast(CWLOutputType, s2), ids) | |
60 | |
61 | |
62 def replace_refs(d: Any, rewrite: Dict[str, str], stem: str, newstem: str) -> None: | |
63 if isinstance(d, MutableSequence): | |
64 for s, v in enumerate(d): | |
65 if isinstance(v, str): | |
66 if v in rewrite: | |
67 d[s] = rewrite[v] | |
68 elif v.startswith(stem): | |
69 d[s] = newstem + v[len(stem) :] | |
70 rewrite[v] = d[s] | |
71 else: | |
72 replace_refs(v, rewrite, stem, newstem) | |
73 elif isinstance(d, MutableMapping): | |
74 for key, val in d.items(): | |
75 if isinstance(val, str): | |
76 if val in rewrite: | |
77 d[key] = rewrite[val] | |
78 elif val.startswith(stem): | |
79 id_ = val[len(stem) :] | |
80 # prevent appending newstems if tool is already packed | |
81 if id_.startswith(newstem.strip("#")): | |
82 d[key] = "#" + id_ | |
83 else: | |
84 d[key] = newstem + id_ | |
85 rewrite[val] = d[key] | |
86 replace_refs(val, rewrite, stem, newstem) | |
87 | |
88 | |
89 def import_embed( | |
90 d: Union[MutableSequence[CWLObjectType], CWLObjectType, CWLOutputType], | |
91 seen: Set[str], | |
92 ) -> None: | |
93 if isinstance(d, MutableSequence): | |
94 for v in d: | |
95 import_embed(cast(CWLOutputType, v), seen) | |
96 elif isinstance(d, MutableMapping): | |
97 for n in ("id", "name"): | |
98 if n in d: | |
99 if isinstance(d[n], str): | |
100 ident = cast(str, d[n]) | |
101 if ident in seen: | |
102 this = ident | |
103 d.clear() | |
104 d["$import"] = this | |
105 else: | |
106 this = ident | |
107 seen.add(this) | |
108 break | |
109 | |
110 for k in sorted(d.keys()): | |
111 import_embed(cast(CWLOutputType, d[k]), seen) | |
112 | |
113 | |
114 def pack( | |
115 loadingContext: LoadingContext, | |
116 uri: str, | |
117 rewrite_out: Optional[Dict[str, str]] = None, | |
118 loader: Optional[Loader] = None, | |
119 ) -> CWLObjectType: | |
120 | |
121 # The workflow document we have in memory right now may have been | |
122 # updated to the internal CWL version. We need to reload the | |
123 # document to go back to its original version. | |
124 # | |
125 # What's going on here is that the updater replaces the | |
126 # documents/fragments in the index with updated ones, the | |
127 # index is also used as a cache, so we need to go through the | |
128 # loading process with an empty index and updating turned off | |
129 # so we have the original un-updated documents. | |
130 # | |
131 loadingContext = loadingContext.copy() | |
132 document_loader = SubLoader(loader or loadingContext.loader or Loader({})) | |
133 loadingContext.do_update = False | |
134 loadingContext.loader = document_loader | |
135 loadingContext.loader.idx = {} | |
136 loadingContext.metadata = {} | |
137 loadingContext, docobj, uri = fetch_document(uri, loadingContext) | |
138 loadingContext, fileuri = resolve_and_validate_document( | |
139 loadingContext, docobj, uri, preprocess_only=True | |
140 ) | |
141 if loadingContext.loader is None: | |
142 raise Exception("loadingContext.loader cannot be none") | |
143 processobj, metadata = loadingContext.loader.resolve_ref(uri) | |
144 document_loader = loadingContext.loader | |
145 | |
146 if isinstance(processobj, MutableMapping): | |
147 document_loader.idx[processobj["id"]] = CommentedMap(processobj.items()) | |
148 elif isinstance(processobj, MutableSequence): | |
149 _, frag = urllib.parse.urldefrag(uri) | |
150 for po in processobj: | |
151 if not frag: | |
152 if po["id"].endswith("#main"): | |
153 uri = po["id"] | |
154 document_loader.idx[po["id"]] = CommentedMap(po.items()) | |
155 document_loader.idx[metadata["id"]] = CommentedMap(metadata.items()) | |
156 | |
157 found_versions = { | |
158 cast(str, loadingContext.metadata["cwlVersion"]) | |
159 } # type: Set[str] | |
160 | |
161 def loadref(base: Optional[str], lr_uri: str) -> ResolveType: | |
162 lr_loadingContext = loadingContext.copy() | |
163 lr_loadingContext.metadata = {} | |
164 lr_loadingContext, lr_workflowobj, lr_uri = fetch_document( | |
165 lr_uri, lr_loadingContext | |
166 ) | |
167 lr_loadingContext, lr_uri = resolve_and_validate_document( | |
168 lr_loadingContext, lr_workflowobj, lr_uri | |
169 ) | |
170 found_versions.add(cast(str, lr_loadingContext.metadata["cwlVersion"])) | |
171 if lr_loadingContext.loader is None: | |
172 raise Exception("loader should not be None") | |
173 return lr_loadingContext.loader.resolve_ref(lr_uri, base_url=base)[0] | |
174 | |
175 ids = set() # type: Set[str] | |
176 find_ids(processobj, ids) | |
177 | |
178 runs = {uri} | |
179 find_run(processobj, loadref, runs) | |
180 | |
181 # Figure out the highest version, everything needs to be updated | |
182 # to it. | |
183 m = 0 | |
184 for fv in found_versions: | |
185 m = max(m, ORDERED_VERSIONS.index(fv)) | |
186 update_to_version = ORDERED_VERSIONS[m] | |
187 | |
188 for f in runs: | |
189 find_ids(document_loader.resolve_ref(f)[0], ids) | |
190 | |
191 names = set() # type: Set[str] | |
192 if rewrite_out is None: | |
193 rewrite = {} # type: Dict[str, str] | |
194 else: | |
195 rewrite = rewrite_out | |
196 | |
197 mainpath, _ = urllib.parse.urldefrag(uri) | |
198 | |
199 def rewrite_id(r: str, mainuri: str) -> None: | |
200 if r == mainuri: | |
201 rewrite[r] = "#main" | |
202 elif r.startswith(mainuri) and r[len(mainuri)] in ("#", "/"): | |
203 if r[len(mainuri) :].startswith("#main/"): | |
204 rewrite[r] = "#" + uniquename(r[len(mainuri) + 1 :], names) | |
205 else: | |
206 rewrite[r] = "#" + uniquename("main/" + r[len(mainuri) + 1 :], names) | |
207 else: | |
208 path, frag = urllib.parse.urldefrag(r) | |
209 if path == mainpath: | |
210 rewrite[r] = "#" + uniquename(frag, names) | |
211 else: | |
212 if path not in rewrite: | |
213 rewrite[path] = "#" + uniquename(shortname(path), names) | |
214 | |
215 sortedids = sorted(ids) | |
216 | |
217 for r in sortedids: | |
218 rewrite_id(r, uri) | |
219 | |
220 packed = CommentedMap( | |
221 (("$graph", CommentedSeq()), ("cwlVersion", update_to_version)) | |
222 ) | |
223 namespaces = metadata.get("$namespaces", None) | |
224 | |
225 schemas = set() # type: Set[str] | |
226 if "$schemas" in metadata: | |
227 for each_schema in metadata["$schemas"]: | |
228 schemas.add(each_schema) | |
229 for r in sorted(runs): | |
230 dcr, metadata = document_loader.resolve_ref(r) | |
231 if isinstance(dcr, CommentedSeq): | |
232 dcr = dcr[0] | |
233 dcr = cast(CommentedMap, dcr) | |
234 if not isinstance(dcr, MutableMapping): | |
235 continue | |
236 | |
237 dcr = update( | |
238 dcr, | |
239 document_loader, | |
240 r, | |
241 loadingContext.enable_dev, | |
242 metadata, | |
243 update_to_version, | |
244 ) | |
245 | |
246 if "http://commonwl.org/cwltool#original_cwlVersion" in metadata: | |
247 del metadata["http://commonwl.org/cwltool#original_cwlVersion"] | |
248 if "http://commonwl.org/cwltool#original_cwlVersion" in dcr: | |
249 del dcr["http://commonwl.org/cwltool#original_cwlVersion"] | |
250 | |
251 if "$schemas" in metadata: | |
252 for s in metadata["$schemas"]: | |
253 schemas.add(s) | |
254 if dcr.get("class") not in ("Workflow", "CommandLineTool", "ExpressionTool"): | |
255 continue | |
256 dc = cast(Dict[str, Any], copy.deepcopy(dcr)) | |
257 v = rewrite[r] | |
258 dc["id"] = v | |
259 for n in ("name", "cwlVersion", "$namespaces", "$schemas"): | |
260 if n in dc: | |
261 del dc[n] | |
262 packed["$graph"].append(dc) | |
263 | |
264 if schemas: | |
265 packed["$schemas"] = list(schemas) | |
266 | |
267 for r in list(rewrite.keys()): | |
268 v = rewrite[r] | |
269 replace_refs(packed, rewrite, r + "/" if "#" in r else r + "#", v + "/") | |
270 | |
271 import_embed(packed, set()) | |
272 | |
273 if len(packed["$graph"]) == 1: | |
274 # duplicate 'cwlVersion' and $schemas inside $graph when there is only | |
275 # a single item because we will print the contents inside '$graph' | |
276 # rather than whole dict | |
277 packed["$graph"][0]["cwlVersion"] = packed["cwlVersion"] | |
278 if schemas: | |
279 packed["$graph"][0]["$schemas"] = list(schemas) | |
280 # always include $namespaces in the #main | |
281 if namespaces: | |
282 packed["$graph"][0]["$namespaces"] = namespaces | |
283 | |
284 return packed |