Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/load_tool.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
author | shellac |
---|---|
date | Thu, 14 May 2020 14:56:58 -0400 |
parents | 26e78fe6e8c4 |
children |
comparison
equal
deleted
inserted
replaced
1:75ca89e9b81c | 2:6af9afd405e9 |
---|---|
1 """Loads a CWL document.""" | |
2 from __future__ import absolute_import | |
3 | |
4 import hashlib | |
5 import logging | |
6 import os | |
7 import re | |
8 import uuid | |
9 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, | |
10 Optional, Tuple, Union, cast) | |
11 | |
12 import requests.sessions | |
13 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
14 from schema_salad import schema | |
15 from schema_salad.ref_resolver import (ContextType, # pylint: disable=unused-import | |
16 Fetcher, Loader, file_uri, SubLoader) | |
17 from schema_salad.sourceline import SourceLine, cmap | |
18 from schema_salad.validate import ValidationException | |
19 from six import itervalues, string_types | |
20 from six.moves import urllib | |
21 from typing_extensions import Text # pylint: disable=unused-import | |
22 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
23 | |
24 from . import process, update | |
25 from .context import LoadingContext # pylint: disable=unused-import | |
26 from .errors import WorkflowException | |
27 from .loghandler import _logger | |
28 from .process import (Process, get_schema, # pylint: disable=unused-import | |
29 shortname) | |
30 from .software_requirements import ( # pylint: disable=unused-import | |
31 DependenciesConfiguration) | |
32 from .update import ALLUPDATES | |
33 from .utils import json_dumps | |
34 | |
35 | |
36 | |
37 | |
38 jobloaderctx = { | |
39 u"cwl": "https://w3id.org/cwl/cwl#", | |
40 u"cwltool": "http://commonwl.org/cwltool#", | |
41 u"path": {u"@type": u"@id"}, | |
42 u"location": {u"@type": u"@id"}, | |
43 u"id": u"@id" | |
44 } # type: ContextType | |
45 | |
46 | |
47 overrides_ctx = { | |
48 u"overrideTarget": {u"@type": u"@id"}, | |
49 u"cwltool": "http://commonwl.org/cwltool#", | |
50 u"http://commonwl.org/cwltool#overrides": { | |
51 "@id": "cwltool:overrides", | |
52 "mapSubject": "overrideTarget", | |
53 }, | |
54 "requirements": { | |
55 "@id": "https://w3id.org/cwl/cwl#requirements", | |
56 "mapSubject": "class" | |
57 } | |
58 } # type: ContextType | |
59 | |
60 | |
61 FetcherConstructorType = Callable[ | |
62 [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher] | |
63 ResolverType = Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] | |
64 | |
65 def default_loader(fetcher_constructor=None, enable_dev=False): | |
66 # type: (Optional[FetcherConstructorType], bool) -> Loader | |
67 return Loader(jobloaderctx, fetcher_constructor=fetcher_constructor, | |
68 allow_attachments=lambda r: enable_dev) | |
69 | |
70 def resolve_tool_uri(argsworkflow, # type: Text | |
71 resolver=None, # type: Optional[ResolverType] | |
72 fetcher_constructor=None, # type: Optional[FetcherConstructorType] | |
73 document_loader=None # type: Optional[Loader] | |
74 ): # type: (...) -> Tuple[Text, Text] | |
75 | |
76 uri = None # type: Optional[Text] | |
77 split = urllib.parse.urlsplit(argsworkflow) | |
78 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that | |
79 if split.scheme and split.scheme in [u'http', u'https', u'file']: | |
80 uri = argsworkflow | |
81 elif os.path.exists(os.path.abspath(argsworkflow)): | |
82 uri = file_uri(str(os.path.abspath(argsworkflow))) | |
83 elif resolver is not None: | |
84 if document_loader is None: | |
85 document_loader = default_loader(fetcher_constructor) | |
86 uri = resolver(document_loader, argsworkflow) | |
87 | |
88 if uri is None: | |
89 raise ValidationException("Not found: '%s'" % argsworkflow) | |
90 | |
91 if argsworkflow != uri: | |
92 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) | |
93 | |
94 fileuri = urllib.parse.urldefrag(uri)[0] | |
95 return uri, fileuri | |
96 | |
97 | |
98 def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]] | |
99 loadingContext=None # type: Optional[LoadingContext] | |
100 ): # type: (...) -> Tuple[LoadingContext, CommentedMap, Text] | |
101 """Retrieve a CWL document.""" | |
102 if loadingContext is None: | |
103 loadingContext = LoadingContext() | |
104 loadingContext.loader = default_loader() | |
105 else: | |
106 loadingContext = loadingContext.copy() | |
107 if loadingContext.loader is None: | |
108 loadingContext.loader = default_loader(loadingContext.fetcher_constructor) | |
109 | |
110 if isinstance(argsworkflow, string_types): | |
111 uri, fileuri = resolve_tool_uri(argsworkflow, | |
112 resolver=loadingContext.resolver, | |
113 document_loader=loadingContext.loader) | |
114 workflowobj = loadingContext.loader.fetch(fileuri) | |
115 return loadingContext, workflowobj, uri | |
116 if isinstance(argsworkflow, dict): | |
117 uri = argsworkflow["id"] if argsworkflow.get("id") else "_:" + Text(uuid.uuid4()) | |
118 workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri)) | |
119 loadingContext.loader.idx[uri] = workflowobj | |
120 return loadingContext, workflowobj, uri | |
121 raise ValidationException("Must be URI or object: '%s'" % argsworkflow) | |
122 | |
123 | |
124 def _convert_stdstreams_to_files(workflowobj): | |
125 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None | |
126 | |
127 if isinstance(workflowobj, MutableMapping): | |
128 if workflowobj.get('class') == 'CommandLineTool': | |
129 with SourceLine(workflowobj, "outputs", ValidationException, | |
130 _logger.isEnabledFor(logging.DEBUG)): | |
131 outputs = workflowobj.get('outputs', []) | |
132 if not isinstance(outputs, CommentedSeq): | |
133 raise ValidationException('"outputs" section is not ' | |
134 'valid.') | |
135 for out in workflowobj.get('outputs', []): | |
136 if not isinstance(out, CommentedMap): | |
137 raise ValidationException( | |
138 "Output '{}' is not a valid " | |
139 "OutputParameter.".format(out)) | |
140 for streamtype in ['stdout', 'stderr']: | |
141 if out.get('type') == streamtype: | |
142 if 'outputBinding' in out: | |
143 raise ValidationException( | |
144 "Not allowed to specify outputBinding when" | |
145 " using %s shortcut." % streamtype) | |
146 if streamtype in workflowobj: | |
147 filename = workflowobj[streamtype] | |
148 else: | |
149 filename = Text( | |
150 hashlib.sha1( # nosec | |
151 json_dumps(workflowobj, sort_keys=True | |
152 ).encode('utf-8')).hexdigest()) | |
153 workflowobj[streamtype] = filename | |
154 out['type'] = 'File' | |
155 out['outputBinding'] = cmap({'glob': filename}) | |
156 for inp in workflowobj.get('inputs', []): | |
157 if inp.get('type') == 'stdin': | |
158 if 'inputBinding' in inp: | |
159 raise ValidationException( | |
160 "Not allowed to specify inputBinding when" | |
161 " using stdin shortcut.") | |
162 if 'stdin' in workflowobj: | |
163 raise ValidationException( | |
164 "Not allowed to specify stdin path when" | |
165 " using stdin type shortcut.") | |
166 else: | |
167 workflowobj['stdin'] = \ | |
168 "$(inputs.%s.path)" % \ | |
169 inp['id'].rpartition('#')[2] | |
170 inp['type'] = 'File' | |
171 else: | |
172 for entry in itervalues(workflowobj): | |
173 _convert_stdstreams_to_files(entry) | |
174 if isinstance(workflowobj, MutableSequence): | |
175 for entry in workflowobj: | |
176 _convert_stdstreams_to_files(entry) | |
177 | |
178 def _add_blank_ids(workflowobj): | |
179 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None | |
180 | |
181 if isinstance(workflowobj, MutableMapping): | |
182 if ("run" in workflowobj and | |
183 isinstance(workflowobj["run"], MutableMapping) and | |
184 "id" not in workflowobj["run"] and | |
185 "$import" not in workflowobj["run"]): | |
186 workflowobj["run"]["id"] = Text(uuid.uuid4()) | |
187 for entry in itervalues(workflowobj): | |
188 _add_blank_ids(entry) | |
189 if isinstance(workflowobj, MutableSequence): | |
190 for entry in workflowobj: | |
191 _add_blank_ids(entry) | |
192 | |
193 def resolve_and_validate_document( | |
194 loadingContext, # type: LoadingContext | |
195 workflowobj, # type: Union[CommentedMap, CommentedSeq] | |
196 uri, # type: Text | |
197 preprocess_only=False, # type: bool | |
198 skip_schemas=None, # type: Optional[bool] | |
199 ): | |
200 # type: (...) -> Tuple[LoadingContext, Text] | |
201 """Validate a CWL document.""" | |
202 if not loadingContext.loader: | |
203 raise ValueError("loadingContext must have a loader.") | |
204 else: | |
205 loader = loadingContext.loader | |
206 loadingContext = loadingContext.copy() | |
207 | |
208 if not isinstance(workflowobj, MutableMapping): | |
209 raise ValueError("workflowjobj must be a dict, got '{}': {}".format( | |
210 type(workflowobj), workflowobj)) | |
211 | |
212 jobobj = None | |
213 if "cwl:tool" in workflowobj: | |
214 jobobj, _ = loader.resolve_all(workflowobj, uri) | |
215 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) | |
216 del cast(Dict[Text, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] | |
217 | |
218 workflowobj = fetch_document(uri, loadingContext)[1] | |
219 | |
220 fileuri = urllib.parse.urldefrag(uri)[0] | |
221 | |
222 cwlVersion = loadingContext.metadata.get("cwlVersion") | |
223 if not cwlVersion: | |
224 cwlVersion = workflowobj.get("cwlVersion") | |
225 if not cwlVersion and fileuri != uri: | |
226 # The tool we're loading is a fragment of a bigger file. Get | |
227 # the document root element and look for cwlVersion there. | |
228 metadata = fetch_document(fileuri, loadingContext)[1] # type: Dict[Text, Any] | |
229 cwlVersion = metadata.get("cwlVersion") | |
230 if not cwlVersion: | |
231 raise ValidationException( | |
232 "No cwlVersion found. " | |
233 "Use the following syntax in your CWL document to declare " | |
234 "the version: cwlVersion: <version>.\n" | |
235 "Note: if this is a CWL draft-2 (pre v1.0) document then it " | |
236 "will need to be upgraded first.") | |
237 | |
238 if not isinstance(cwlVersion, string_types): | |
239 with SourceLine(workflowobj, "cwlVersion", ValidationException): | |
240 raise ValidationException("'cwlVersion' must be a string, " | |
241 "got {}".format( | |
242 type(cwlVersion))) | |
243 # strip out version | |
244 cwlVersion = re.sub( | |
245 r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", | |
246 cwlVersion) | |
247 if cwlVersion not in list(ALLUPDATES): | |
248 # print out all the Supported Versions of cwlVersion | |
249 versions = [] | |
250 for version in list(ALLUPDATES): | |
251 if "dev" in version: | |
252 version += " (with --enable-dev flag only)" | |
253 versions.append(version) | |
254 versions.sort() | |
255 raise ValidationException( | |
256 "The CWL reference runner no longer supports pre CWL v1.0 " | |
257 "documents. Supported versions are: " | |
258 "\n{}".format("\n".join(versions))) | |
259 | |
260 if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj: | |
261 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) | |
262 del jobobj["http://commonwl.org/cwltool#overrides"] | |
263 | |
264 if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj: | |
265 if cwlVersion not in ("v1.1.0-dev1","v1.1"): | |
266 raise ValidationException( | |
267 "`cwl:requirements` in the input object is not part of CWL " | |
268 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
269 "can set the cwlVersion to v1.1 or greater.") | |
270 loadingContext.overrides_list.append({"overrideTarget": uri, | |
271 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"]}) | |
272 del jobobj["https://w3id.org/cwl/cwl#requirements"] | |
273 | |
274 (sch_document_loader, avsc_names) = \ | |
275 process.get_schema(cwlVersion)[:2] | |
276 | |
277 if isinstance(avsc_names, Exception): | |
278 raise avsc_names | |
279 | |
280 processobj = None # type: Union[CommentedMap, CommentedSeq, Text, None] | |
281 document_loader = Loader(sch_document_loader.ctx, | |
282 schemagraph=sch_document_loader.graph, | |
283 idx=loader.idx, | |
284 cache=sch_document_loader.cache, | |
285 fetcher_constructor=loadingContext.fetcher_constructor, | |
286 skip_schemas=skip_schemas) | |
287 | |
288 if cwlVersion == "v1.0": | |
289 _add_blank_ids(workflowobj) | |
290 | |
291 processobj, metadata = document_loader.resolve_all(workflowobj, fileuri) | |
292 if loadingContext.metadata: | |
293 metadata = loadingContext.metadata | |
294 if not isinstance(processobj, (CommentedMap, CommentedSeq)): | |
295 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") | |
296 if not isinstance(metadata, CommentedMap): | |
297 raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata)) | |
298 | |
299 if isinstance(processobj, CommentedMap): | |
300 uri = processobj["id"] | |
301 | |
302 _convert_stdstreams_to_files(workflowobj) | |
303 | |
304 if preprocess_only: | |
305 return loadingContext, uri | |
306 | |
307 if loadingContext.do_validate: | |
308 schema.validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) | |
309 | |
310 # None means default behavior (do update) | |
311 if loadingContext.do_update in (True, None): | |
312 if "cwlVersion" not in metadata: | |
313 metadata["cwlVersion"] = cwlVersion | |
314 processobj = update.update( | |
315 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata) | |
316 document_loader.idx[processobj["id"]] = processobj | |
317 | |
318 if jobobj is not None: | |
319 loadingContext.jobdefaults = jobobj | |
320 | |
321 loadingContext.loader = document_loader | |
322 loadingContext.avsc_names = avsc_names | |
323 loadingContext.metadata = metadata | |
324 | |
325 return loadingContext, uri | |
326 | |
327 | |
328 def make_tool(uri, # type: Union[Text, CommentedMap, CommentedSeq] | |
329 loadingContext # type: LoadingContext | |
330 ): # type: (...) -> Process | |
331 """Make a Python CWL object.""" | |
332 if loadingContext.loader is None: | |
333 raise ValueError("loadingContext must have a loader") | |
334 resolveduri, metadata = loadingContext.loader.resolve_ref(uri) | |
335 | |
336 processobj = None | |
337 if isinstance(resolveduri, MutableSequence): | |
338 for obj in resolveduri: | |
339 if obj['id'].endswith('#main'): | |
340 processobj = obj | |
341 break | |
342 if not processobj: | |
343 raise WorkflowException( | |
344 u"Tool file contains graph of multiple objects, must specify " | |
345 "one of #%s" % ", #".join( | |
346 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri | |
347 if "id" in i)) | |
348 elif isinstance(resolveduri, MutableMapping): | |
349 processobj = resolveduri | |
350 else: | |
351 raise Exception("Must resolve to list or dict") | |
352 | |
353 tool = loadingContext.construct_tool_object(processobj, loadingContext) | |
354 | |
355 if loadingContext.jobdefaults: | |
356 jobobj = loadingContext.jobdefaults | |
357 for inp in tool.tool["inputs"]: | |
358 if shortname(inp["id"]) in jobobj: | |
359 inp["default"] = jobobj[shortname(inp["id"])] | |
360 | |
361 return tool | |
362 | |
363 | |
364 def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] | |
365 loadingContext=None # type: Optional[LoadingContext] | |
366 ): # type: (...) -> Process | |
367 | |
368 loadingContext, workflowobj, uri = fetch_document( | |
369 argsworkflow, loadingContext) | |
370 | |
371 loadingContext, uri = resolve_and_validate_document( | |
372 loadingContext, workflowobj, uri) | |
373 | |
374 return make_tool(uri, | |
375 loadingContext) | |
376 | |
377 def resolve_overrides(ov, # type: CommentedMap | |
378 ov_uri, # type: Text | |
379 baseurl # type: Text | |
380 ): # type: (...) -> List[Dict[Text, Any]] | |
381 ovloader = Loader(overrides_ctx) | |
382 ret, _ = ovloader.resolve_all(ov, baseurl) | |
383 if not isinstance(ret, CommentedMap): | |
384 raise Exception("Expected CommentedMap, got %s" % type(ret)) | |
385 cwl_docloader = get_schema("v1.0")[0] | |
386 cwl_docloader.resolve_all(ret, ov_uri) | |
387 return cast(List[Dict[Text, Any]], | |
388 ret["http://commonwl.org/cwltool#overrides"]) | |
389 | |
390 def load_overrides(ov, base_url): # type: (Text, Text) -> List[Dict[Text, Any]] | |
391 ovloader = Loader(overrides_ctx) | |
392 return resolve_overrides(ovloader.fetch(ov), ov, base_url) |