comparison env/lib/python3.7/site-packages/cwltool/load_tool.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Loads a CWL document."""
2 from __future__ import absolute_import
3
4 import hashlib
5 import logging
6 import os
7 import re
8 import uuid
9 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence,
10 Optional, Tuple, Union, cast)
11
12 import requests.sessions
13 from ruamel.yaml.comments import CommentedMap, CommentedSeq
14 from schema_salad import schema
15 from schema_salad.ref_resolver import (ContextType, # pylint: disable=unused-import
16 Fetcher, Loader, file_uri, SubLoader)
17 from schema_salad.sourceline import SourceLine, cmap
18 from schema_salad.validate import ValidationException
19 from six import itervalues, string_types
20 from six.moves import urllib
21 from typing_extensions import Text # pylint: disable=unused-import
22 # move to a regular typing import when Python 3.3-3.6 is no longer supported
23
24 from . import process, update
25 from .context import LoadingContext # pylint: disable=unused-import
26 from .errors import WorkflowException
27 from .loghandler import _logger
28 from .process import (Process, get_schema, # pylint: disable=unused-import
29 shortname)
30 from .software_requirements import ( # pylint: disable=unused-import
31 DependenciesConfiguration)
32 from .update import ALLUPDATES
33 from .utils import json_dumps
34
35
36
37
38 jobloaderctx = {
39 u"cwl": "https://w3id.org/cwl/cwl#",
40 u"cwltool": "http://commonwl.org/cwltool#",
41 u"path": {u"@type": u"@id"},
42 u"location": {u"@type": u"@id"},
43 u"id": u"@id"
44 } # type: ContextType
45
46
47 overrides_ctx = {
48 u"overrideTarget": {u"@type": u"@id"},
49 u"cwltool": "http://commonwl.org/cwltool#",
50 u"http://commonwl.org/cwltool#overrides": {
51 "@id": "cwltool:overrides",
52 "mapSubject": "overrideTarget",
53 },
54 "requirements": {
55 "@id": "https://w3id.org/cwl/cwl#requirements",
56 "mapSubject": "class"
57 }
58 } # type: ContextType
59
60
61 FetcherConstructorType = Callable[
62 [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher]
63 ResolverType = Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
64
65 def default_loader(fetcher_constructor=None, enable_dev=False):
66 # type: (Optional[FetcherConstructorType], bool) -> Loader
67 return Loader(jobloaderctx, fetcher_constructor=fetcher_constructor,
68 allow_attachments=lambda r: enable_dev)
69
70 def resolve_tool_uri(argsworkflow, # type: Text
71 resolver=None, # type: Optional[ResolverType]
72 fetcher_constructor=None, # type: Optional[FetcherConstructorType]
73 document_loader=None # type: Optional[Loader]
74 ): # type: (...) -> Tuple[Text, Text]
75
76 uri = None # type: Optional[Text]
77 split = urllib.parse.urlsplit(argsworkflow)
78 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
79 if split.scheme and split.scheme in [u'http', u'https', u'file']:
80 uri = argsworkflow
81 elif os.path.exists(os.path.abspath(argsworkflow)):
82 uri = file_uri(str(os.path.abspath(argsworkflow)))
83 elif resolver is not None:
84 if document_loader is None:
85 document_loader = default_loader(fetcher_constructor)
86 uri = resolver(document_loader, argsworkflow)
87
88 if uri is None:
89 raise ValidationException("Not found: '%s'" % argsworkflow)
90
91 if argsworkflow != uri:
92 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri)
93
94 fileuri = urllib.parse.urldefrag(uri)[0]
95 return uri, fileuri
96
97
98 def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
99 loadingContext=None # type: Optional[LoadingContext]
100 ): # type: (...) -> Tuple[LoadingContext, CommentedMap, Text]
101 """Retrieve a CWL document."""
102 if loadingContext is None:
103 loadingContext = LoadingContext()
104 loadingContext.loader = default_loader()
105 else:
106 loadingContext = loadingContext.copy()
107 if loadingContext.loader is None:
108 loadingContext.loader = default_loader(loadingContext.fetcher_constructor)
109
110 if isinstance(argsworkflow, string_types):
111 uri, fileuri = resolve_tool_uri(argsworkflow,
112 resolver=loadingContext.resolver,
113 document_loader=loadingContext.loader)
114 workflowobj = loadingContext.loader.fetch(fileuri)
115 return loadingContext, workflowobj, uri
116 if isinstance(argsworkflow, dict):
117 uri = argsworkflow["id"] if argsworkflow.get("id") else "_:" + Text(uuid.uuid4())
118 workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri))
119 loadingContext.loader.idx[uri] = workflowobj
120 return loadingContext, workflowobj, uri
121 raise ValidationException("Must be URI or object: '%s'" % argsworkflow)
122
123
124 def _convert_stdstreams_to_files(workflowobj):
125 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None
126
127 if isinstance(workflowobj, MutableMapping):
128 if workflowobj.get('class') == 'CommandLineTool':
129 with SourceLine(workflowobj, "outputs", ValidationException,
130 _logger.isEnabledFor(logging.DEBUG)):
131 outputs = workflowobj.get('outputs', [])
132 if not isinstance(outputs, CommentedSeq):
133 raise ValidationException('"outputs" section is not '
134 'valid.')
135 for out in workflowobj.get('outputs', []):
136 if not isinstance(out, CommentedMap):
137 raise ValidationException(
138 "Output '{}' is not a valid "
139 "OutputParameter.".format(out))
140 for streamtype in ['stdout', 'stderr']:
141 if out.get('type') == streamtype:
142 if 'outputBinding' in out:
143 raise ValidationException(
144 "Not allowed to specify outputBinding when"
145 " using %s shortcut." % streamtype)
146 if streamtype in workflowobj:
147 filename = workflowobj[streamtype]
148 else:
149 filename = Text(
150 hashlib.sha1( # nosec
151 json_dumps(workflowobj, sort_keys=True
152 ).encode('utf-8')).hexdigest())
153 workflowobj[streamtype] = filename
154 out['type'] = 'File'
155 out['outputBinding'] = cmap({'glob': filename})
156 for inp in workflowobj.get('inputs', []):
157 if inp.get('type') == 'stdin':
158 if 'inputBinding' in inp:
159 raise ValidationException(
160 "Not allowed to specify inputBinding when"
161 " using stdin shortcut.")
162 if 'stdin' in workflowobj:
163 raise ValidationException(
164 "Not allowed to specify stdin path when"
165 " using stdin type shortcut.")
166 else:
167 workflowobj['stdin'] = \
168 "$(inputs.%s.path)" % \
169 inp['id'].rpartition('#')[2]
170 inp['type'] = 'File'
171 else:
172 for entry in itervalues(workflowobj):
173 _convert_stdstreams_to_files(entry)
174 if isinstance(workflowobj, MutableSequence):
175 for entry in workflowobj:
176 _convert_stdstreams_to_files(entry)
177
178 def _add_blank_ids(workflowobj):
179 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None
180
181 if isinstance(workflowobj, MutableMapping):
182 if ("run" in workflowobj and
183 isinstance(workflowobj["run"], MutableMapping) and
184 "id" not in workflowobj["run"] and
185 "$import" not in workflowobj["run"]):
186 workflowobj["run"]["id"] = Text(uuid.uuid4())
187 for entry in itervalues(workflowobj):
188 _add_blank_ids(entry)
189 if isinstance(workflowobj, MutableSequence):
190 for entry in workflowobj:
191 _add_blank_ids(entry)
192
193 def resolve_and_validate_document(
194 loadingContext, # type: LoadingContext
195 workflowobj, # type: Union[CommentedMap, CommentedSeq]
196 uri, # type: Text
197 preprocess_only=False, # type: bool
198 skip_schemas=None, # type: Optional[bool]
199 ):
200 # type: (...) -> Tuple[LoadingContext, Text]
201 """Validate a CWL document."""
202 if not loadingContext.loader:
203 raise ValueError("loadingContext must have a loader.")
204 else:
205 loader = loadingContext.loader
206 loadingContext = loadingContext.copy()
207
208 if not isinstance(workflowobj, MutableMapping):
209 raise ValueError("workflowjobj must be a dict, got '{}': {}".format(
210 type(workflowobj), workflowobj))
211
212 jobobj = None
213 if "cwl:tool" in workflowobj:
214 jobobj, _ = loader.resolve_all(workflowobj, uri)
215 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"])
216 del cast(Dict[Text, Any], jobobj)["https://w3id.org/cwl/cwl#tool"]
217
218 workflowobj = fetch_document(uri, loadingContext)[1]
219
220 fileuri = urllib.parse.urldefrag(uri)[0]
221
222 cwlVersion = loadingContext.metadata.get("cwlVersion")
223 if not cwlVersion:
224 cwlVersion = workflowobj.get("cwlVersion")
225 if not cwlVersion and fileuri != uri:
226 # The tool we're loading is a fragment of a bigger file. Get
227 # the document root element and look for cwlVersion there.
228 metadata = fetch_document(fileuri, loadingContext)[1] # type: Dict[Text, Any]
229 cwlVersion = metadata.get("cwlVersion")
230 if not cwlVersion:
231 raise ValidationException(
232 "No cwlVersion found. "
233 "Use the following syntax in your CWL document to declare "
234 "the version: cwlVersion: <version>.\n"
235 "Note: if this is a CWL draft-2 (pre v1.0) document then it "
236 "will need to be upgraded first.")
237
238 if not isinstance(cwlVersion, string_types):
239 with SourceLine(workflowobj, "cwlVersion", ValidationException):
240 raise ValidationException("'cwlVersion' must be a string, "
241 "got {}".format(
242 type(cwlVersion)))
243 # strip out version
244 cwlVersion = re.sub(
245 r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
246 cwlVersion)
247 if cwlVersion not in list(ALLUPDATES):
248 # print out all the Supported Versions of cwlVersion
249 versions = []
250 for version in list(ALLUPDATES):
251 if "dev" in version:
252 version += " (with --enable-dev flag only)"
253 versions.append(version)
254 versions.sort()
255 raise ValidationException(
256 "The CWL reference runner no longer supports pre CWL v1.0 "
257 "documents. Supported versions are: "
258 "\n{}".format("\n".join(versions)))
259
260 if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj:
261 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri))
262 del jobobj["http://commonwl.org/cwltool#overrides"]
263
264 if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj:
265 if cwlVersion not in ("v1.1.0-dev1","v1.1"):
266 raise ValidationException(
267 "`cwl:requirements` in the input object is not part of CWL "
268 "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
269 "can set the cwlVersion to v1.1 or greater.")
270 loadingContext.overrides_list.append({"overrideTarget": uri,
271 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"]})
272 del jobobj["https://w3id.org/cwl/cwl#requirements"]
273
274 (sch_document_loader, avsc_names) = \
275 process.get_schema(cwlVersion)[:2]
276
277 if isinstance(avsc_names, Exception):
278 raise avsc_names
279
280 processobj = None # type: Union[CommentedMap, CommentedSeq, Text, None]
281 document_loader = Loader(sch_document_loader.ctx,
282 schemagraph=sch_document_loader.graph,
283 idx=loader.idx,
284 cache=sch_document_loader.cache,
285 fetcher_constructor=loadingContext.fetcher_constructor,
286 skip_schemas=skip_schemas)
287
288 if cwlVersion == "v1.0":
289 _add_blank_ids(workflowobj)
290
291 processobj, metadata = document_loader.resolve_all(workflowobj, fileuri)
292 if loadingContext.metadata:
293 metadata = loadingContext.metadata
294 if not isinstance(processobj, (CommentedMap, CommentedSeq)):
295 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.")
296 if not isinstance(metadata, CommentedMap):
297 raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata))
298
299 if isinstance(processobj, CommentedMap):
300 uri = processobj["id"]
301
302 _convert_stdstreams_to_files(workflowobj)
303
304 if preprocess_only:
305 return loadingContext, uri
306
307 if loadingContext.do_validate:
308 schema.validate_doc(avsc_names, processobj, document_loader, loadingContext.strict)
309
310 # None means default behavior (do update)
311 if loadingContext.do_update in (True, None):
312 if "cwlVersion" not in metadata:
313 metadata["cwlVersion"] = cwlVersion
314 processobj = update.update(
315 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata)
316 document_loader.idx[processobj["id"]] = processobj
317
318 if jobobj is not None:
319 loadingContext.jobdefaults = jobobj
320
321 loadingContext.loader = document_loader
322 loadingContext.avsc_names = avsc_names
323 loadingContext.metadata = metadata
324
325 return loadingContext, uri
326
327
328 def make_tool(uri, # type: Union[Text, CommentedMap, CommentedSeq]
329 loadingContext # type: LoadingContext
330 ): # type: (...) -> Process
331 """Make a Python CWL object."""
332 if loadingContext.loader is None:
333 raise ValueError("loadingContext must have a loader")
334 resolveduri, metadata = loadingContext.loader.resolve_ref(uri)
335
336 processobj = None
337 if isinstance(resolveduri, MutableSequence):
338 for obj in resolveduri:
339 if obj['id'].endswith('#main'):
340 processobj = obj
341 break
342 if not processobj:
343 raise WorkflowException(
344 u"Tool file contains graph of multiple objects, must specify "
345 "one of #%s" % ", #".join(
346 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri
347 if "id" in i))
348 elif isinstance(resolveduri, MutableMapping):
349 processobj = resolveduri
350 else:
351 raise Exception("Must resolve to list or dict")
352
353 tool = loadingContext.construct_tool_object(processobj, loadingContext)
354
355 if loadingContext.jobdefaults:
356 jobobj = loadingContext.jobdefaults
357 for inp in tool.tool["inputs"]:
358 if shortname(inp["id"]) in jobobj:
359 inp["default"] = jobobj[shortname(inp["id"])]
360
361 return tool
362
363
364 def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
365 loadingContext=None # type: Optional[LoadingContext]
366 ): # type: (...) -> Process
367
368 loadingContext, workflowobj, uri = fetch_document(
369 argsworkflow, loadingContext)
370
371 loadingContext, uri = resolve_and_validate_document(
372 loadingContext, workflowobj, uri)
373
374 return make_tool(uri,
375 loadingContext)
376
377 def resolve_overrides(ov, # type: CommentedMap
378 ov_uri, # type: Text
379 baseurl # type: Text
380 ): # type: (...) -> List[Dict[Text, Any]]
381 ovloader = Loader(overrides_ctx)
382 ret, _ = ovloader.resolve_all(ov, baseurl)
383 if not isinstance(ret, CommentedMap):
384 raise Exception("Expected CommentedMap, got %s" % type(ret))
385 cwl_docloader = get_schema("v1.0")[0]
386 cwl_docloader.resolve_all(ret, ov_uri)
387 return cast(List[Dict[Text, Any]],
388 ret["http://commonwl.org/cwltool#overrides"])
389
390 def load_overrides(ov, base_url): # type: (Text, Text) -> List[Dict[Text, Any]]
391 ovloader = Loader(overrides_ctx)
392 return resolve_overrides(ovloader.fetch(ov), ov, base_url)