Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/load_tool.py @ 2:6af9afd405e9 draft
"planemo upload commit 0a63dd5f4d38a1f6944587f52a8cd79874177fc1"
| author | shellac |
|---|---|
| date | Thu, 14 May 2020 14:56:58 -0400 |
| parents | 26e78fe6e8c4 |
| children |
comparison
equal
deleted
inserted
replaced
| 1:75ca89e9b81c | 2:6af9afd405e9 |
|---|---|
| 1 """Loads a CWL document.""" | |
| 2 from __future__ import absolute_import | |
| 3 | |
| 4 import hashlib | |
| 5 import logging | |
| 6 import os | |
| 7 import re | |
| 8 import uuid | |
| 9 from typing import (Any, Callable, Dict, List, MutableMapping, MutableSequence, | |
| 10 Optional, Tuple, Union, cast) | |
| 11 | |
| 12 import requests.sessions | |
| 13 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 14 from schema_salad import schema | |
| 15 from schema_salad.ref_resolver import (ContextType, # pylint: disable=unused-import | |
| 16 Fetcher, Loader, file_uri, SubLoader) | |
| 17 from schema_salad.sourceline import SourceLine, cmap | |
| 18 from schema_salad.validate import ValidationException | |
| 19 from six import itervalues, string_types | |
| 20 from six.moves import urllib | |
| 21 from typing_extensions import Text # pylint: disable=unused-import | |
| 22 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 23 | |
| 24 from . import process, update | |
| 25 from .context import LoadingContext # pylint: disable=unused-import | |
| 26 from .errors import WorkflowException | |
| 27 from .loghandler import _logger | |
| 28 from .process import (Process, get_schema, # pylint: disable=unused-import | |
| 29 shortname) | |
| 30 from .software_requirements import ( # pylint: disable=unused-import | |
| 31 DependenciesConfiguration) | |
| 32 from .update import ALLUPDATES | |
| 33 from .utils import json_dumps | |
| 34 | |
| 35 | |
| 36 | |
| 37 | |
| 38 jobloaderctx = { | |
| 39 u"cwl": "https://w3id.org/cwl/cwl#", | |
| 40 u"cwltool": "http://commonwl.org/cwltool#", | |
| 41 u"path": {u"@type": u"@id"}, | |
| 42 u"location": {u"@type": u"@id"}, | |
| 43 u"id": u"@id" | |
| 44 } # type: ContextType | |
| 45 | |
| 46 | |
| 47 overrides_ctx = { | |
| 48 u"overrideTarget": {u"@type": u"@id"}, | |
| 49 u"cwltool": "http://commonwl.org/cwltool#", | |
| 50 u"http://commonwl.org/cwltool#overrides": { | |
| 51 "@id": "cwltool:overrides", | |
| 52 "mapSubject": "overrideTarget", | |
| 53 }, | |
| 54 "requirements": { | |
| 55 "@id": "https://w3id.org/cwl/cwl#requirements", | |
| 56 "mapSubject": "class" | |
| 57 } | |
| 58 } # type: ContextType | |
| 59 | |
| 60 | |
| 61 FetcherConstructorType = Callable[ | |
| 62 [Dict[Text, Union[Text, bool]], requests.sessions.Session], Fetcher] | |
| 63 ResolverType = Callable[[Loader, Union[Text, Dict[Text, Any]]], Text] | |
| 64 | |
| 65 def default_loader(fetcher_constructor=None, enable_dev=False): | |
| 66 # type: (Optional[FetcherConstructorType], bool) -> Loader | |
| 67 return Loader(jobloaderctx, fetcher_constructor=fetcher_constructor, | |
| 68 allow_attachments=lambda r: enable_dev) | |
| 69 | |
| 70 def resolve_tool_uri(argsworkflow, # type: Text | |
| 71 resolver=None, # type: Optional[ResolverType] | |
| 72 fetcher_constructor=None, # type: Optional[FetcherConstructorType] | |
| 73 document_loader=None # type: Optional[Loader] | |
| 74 ): # type: (...) -> Tuple[Text, Text] | |
| 75 | |
| 76 uri = None # type: Optional[Text] | |
| 77 split = urllib.parse.urlsplit(argsworkflow) | |
| 78 # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that | |
| 79 if split.scheme and split.scheme in [u'http', u'https', u'file']: | |
| 80 uri = argsworkflow | |
| 81 elif os.path.exists(os.path.abspath(argsworkflow)): | |
| 82 uri = file_uri(str(os.path.abspath(argsworkflow))) | |
| 83 elif resolver is not None: | |
| 84 if document_loader is None: | |
| 85 document_loader = default_loader(fetcher_constructor) | |
| 86 uri = resolver(document_loader, argsworkflow) | |
| 87 | |
| 88 if uri is None: | |
| 89 raise ValidationException("Not found: '%s'" % argsworkflow) | |
| 90 | |
| 91 if argsworkflow != uri: | |
| 92 _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) | |
| 93 | |
| 94 fileuri = urllib.parse.urldefrag(uri)[0] | |
| 95 return uri, fileuri | |
| 96 | |
| 97 | |
| 98 def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]] | |
| 99 loadingContext=None # type: Optional[LoadingContext] | |
| 100 ): # type: (...) -> Tuple[LoadingContext, CommentedMap, Text] | |
| 101 """Retrieve a CWL document.""" | |
| 102 if loadingContext is None: | |
| 103 loadingContext = LoadingContext() | |
| 104 loadingContext.loader = default_loader() | |
| 105 else: | |
| 106 loadingContext = loadingContext.copy() | |
| 107 if loadingContext.loader is None: | |
| 108 loadingContext.loader = default_loader(loadingContext.fetcher_constructor) | |
| 109 | |
| 110 if isinstance(argsworkflow, string_types): | |
| 111 uri, fileuri = resolve_tool_uri(argsworkflow, | |
| 112 resolver=loadingContext.resolver, | |
| 113 document_loader=loadingContext.loader) | |
| 114 workflowobj = loadingContext.loader.fetch(fileuri) | |
| 115 return loadingContext, workflowobj, uri | |
| 116 if isinstance(argsworkflow, dict): | |
| 117 uri = argsworkflow["id"] if argsworkflow.get("id") else "_:" + Text(uuid.uuid4()) | |
| 118 workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri)) | |
| 119 loadingContext.loader.idx[uri] = workflowobj | |
| 120 return loadingContext, workflowobj, uri | |
| 121 raise ValidationException("Must be URI or object: '%s'" % argsworkflow) | |
| 122 | |
| 123 | |
| 124 def _convert_stdstreams_to_files(workflowobj): | |
| 125 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None | |
| 126 | |
| 127 if isinstance(workflowobj, MutableMapping): | |
| 128 if workflowobj.get('class') == 'CommandLineTool': | |
| 129 with SourceLine(workflowobj, "outputs", ValidationException, | |
| 130 _logger.isEnabledFor(logging.DEBUG)): | |
| 131 outputs = workflowobj.get('outputs', []) | |
| 132 if not isinstance(outputs, CommentedSeq): | |
| 133 raise ValidationException('"outputs" section is not ' | |
| 134 'valid.') | |
| 135 for out in workflowobj.get('outputs', []): | |
| 136 if not isinstance(out, CommentedMap): | |
| 137 raise ValidationException( | |
| 138 "Output '{}' is not a valid " | |
| 139 "OutputParameter.".format(out)) | |
| 140 for streamtype in ['stdout', 'stderr']: | |
| 141 if out.get('type') == streamtype: | |
| 142 if 'outputBinding' in out: | |
| 143 raise ValidationException( | |
| 144 "Not allowed to specify outputBinding when" | |
| 145 " using %s shortcut." % streamtype) | |
| 146 if streamtype in workflowobj: | |
| 147 filename = workflowobj[streamtype] | |
| 148 else: | |
| 149 filename = Text( | |
| 150 hashlib.sha1( # nosec | |
| 151 json_dumps(workflowobj, sort_keys=True | |
| 152 ).encode('utf-8')).hexdigest()) | |
| 153 workflowobj[streamtype] = filename | |
| 154 out['type'] = 'File' | |
| 155 out['outputBinding'] = cmap({'glob': filename}) | |
| 156 for inp in workflowobj.get('inputs', []): | |
| 157 if inp.get('type') == 'stdin': | |
| 158 if 'inputBinding' in inp: | |
| 159 raise ValidationException( | |
| 160 "Not allowed to specify inputBinding when" | |
| 161 " using stdin shortcut.") | |
| 162 if 'stdin' in workflowobj: | |
| 163 raise ValidationException( | |
| 164 "Not allowed to specify stdin path when" | |
| 165 " using stdin type shortcut.") | |
| 166 else: | |
| 167 workflowobj['stdin'] = \ | |
| 168 "$(inputs.%s.path)" % \ | |
| 169 inp['id'].rpartition('#')[2] | |
| 170 inp['type'] = 'File' | |
| 171 else: | |
| 172 for entry in itervalues(workflowobj): | |
| 173 _convert_stdstreams_to_files(entry) | |
| 174 if isinstance(workflowobj, MutableSequence): | |
| 175 for entry in workflowobj: | |
| 176 _convert_stdstreams_to_files(entry) | |
| 177 | |
| 178 def _add_blank_ids(workflowobj): | |
| 179 # type: (Union[Dict[Text, Any], List[Dict[Text, Any]]]) -> None | |
| 180 | |
| 181 if isinstance(workflowobj, MutableMapping): | |
| 182 if ("run" in workflowobj and | |
| 183 isinstance(workflowobj["run"], MutableMapping) and | |
| 184 "id" not in workflowobj["run"] and | |
| 185 "$import" not in workflowobj["run"]): | |
| 186 workflowobj["run"]["id"] = Text(uuid.uuid4()) | |
| 187 for entry in itervalues(workflowobj): | |
| 188 _add_blank_ids(entry) | |
| 189 if isinstance(workflowobj, MutableSequence): | |
| 190 for entry in workflowobj: | |
| 191 _add_blank_ids(entry) | |
| 192 | |
| 193 def resolve_and_validate_document( | |
| 194 loadingContext, # type: LoadingContext | |
| 195 workflowobj, # type: Union[CommentedMap, CommentedSeq] | |
| 196 uri, # type: Text | |
| 197 preprocess_only=False, # type: bool | |
| 198 skip_schemas=None, # type: Optional[bool] | |
| 199 ): | |
| 200 # type: (...) -> Tuple[LoadingContext, Text] | |
| 201 """Validate a CWL document.""" | |
| 202 if not loadingContext.loader: | |
| 203 raise ValueError("loadingContext must have a loader.") | |
| 204 else: | |
| 205 loader = loadingContext.loader | |
| 206 loadingContext = loadingContext.copy() | |
| 207 | |
| 208 if not isinstance(workflowobj, MutableMapping): | |
| 209 raise ValueError("workflowjobj must be a dict, got '{}': {}".format( | |
| 210 type(workflowobj), workflowobj)) | |
| 211 | |
| 212 jobobj = None | |
| 213 if "cwl:tool" in workflowobj: | |
| 214 jobobj, _ = loader.resolve_all(workflowobj, uri) | |
| 215 uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) | |
| 216 del cast(Dict[Text, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] | |
| 217 | |
| 218 workflowobj = fetch_document(uri, loadingContext)[1] | |
| 219 | |
| 220 fileuri = urllib.parse.urldefrag(uri)[0] | |
| 221 | |
| 222 cwlVersion = loadingContext.metadata.get("cwlVersion") | |
| 223 if not cwlVersion: | |
| 224 cwlVersion = workflowobj.get("cwlVersion") | |
| 225 if not cwlVersion and fileuri != uri: | |
| 226 # The tool we're loading is a fragment of a bigger file. Get | |
| 227 # the document root element and look for cwlVersion there. | |
| 228 metadata = fetch_document(fileuri, loadingContext)[1] # type: Dict[Text, Any] | |
| 229 cwlVersion = metadata.get("cwlVersion") | |
| 230 if not cwlVersion: | |
| 231 raise ValidationException( | |
| 232 "No cwlVersion found. " | |
| 233 "Use the following syntax in your CWL document to declare " | |
| 234 "the version: cwlVersion: <version>.\n" | |
| 235 "Note: if this is a CWL draft-2 (pre v1.0) document then it " | |
| 236 "will need to be upgraded first.") | |
| 237 | |
| 238 if not isinstance(cwlVersion, string_types): | |
| 239 with SourceLine(workflowobj, "cwlVersion", ValidationException): | |
| 240 raise ValidationException("'cwlVersion' must be a string, " | |
| 241 "got {}".format( | |
| 242 type(cwlVersion))) | |
| 243 # strip out version | |
| 244 cwlVersion = re.sub( | |
| 245 r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", | |
| 246 cwlVersion) | |
| 247 if cwlVersion not in list(ALLUPDATES): | |
| 248 # print out all the Supported Versions of cwlVersion | |
| 249 versions = [] | |
| 250 for version in list(ALLUPDATES): | |
| 251 if "dev" in version: | |
| 252 version += " (with --enable-dev flag only)" | |
| 253 versions.append(version) | |
| 254 versions.sort() | |
| 255 raise ValidationException( | |
| 256 "The CWL reference runner no longer supports pre CWL v1.0 " | |
| 257 "documents. Supported versions are: " | |
| 258 "\n{}".format("\n".join(versions))) | |
| 259 | |
| 260 if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj: | |
| 261 loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) | |
| 262 del jobobj["http://commonwl.org/cwltool#overrides"] | |
| 263 | |
| 264 if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj: | |
| 265 if cwlVersion not in ("v1.1.0-dev1","v1.1"): | |
| 266 raise ValidationException( | |
| 267 "`cwl:requirements` in the input object is not part of CWL " | |
| 268 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 269 "can set the cwlVersion to v1.1 or greater.") | |
| 270 loadingContext.overrides_list.append({"overrideTarget": uri, | |
| 271 "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"]}) | |
| 272 del jobobj["https://w3id.org/cwl/cwl#requirements"] | |
| 273 | |
| 274 (sch_document_loader, avsc_names) = \ | |
| 275 process.get_schema(cwlVersion)[:2] | |
| 276 | |
| 277 if isinstance(avsc_names, Exception): | |
| 278 raise avsc_names | |
| 279 | |
| 280 processobj = None # type: Union[CommentedMap, CommentedSeq, Text, None] | |
| 281 document_loader = Loader(sch_document_loader.ctx, | |
| 282 schemagraph=sch_document_loader.graph, | |
| 283 idx=loader.idx, | |
| 284 cache=sch_document_loader.cache, | |
| 285 fetcher_constructor=loadingContext.fetcher_constructor, | |
| 286 skip_schemas=skip_schemas) | |
| 287 | |
| 288 if cwlVersion == "v1.0": | |
| 289 _add_blank_ids(workflowobj) | |
| 290 | |
| 291 processobj, metadata = document_loader.resolve_all(workflowobj, fileuri) | |
| 292 if loadingContext.metadata: | |
| 293 metadata = loadingContext.metadata | |
| 294 if not isinstance(processobj, (CommentedMap, CommentedSeq)): | |
| 295 raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") | |
| 296 if not isinstance(metadata, CommentedMap): | |
| 297 raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata)) | |
| 298 | |
| 299 if isinstance(processobj, CommentedMap): | |
| 300 uri = processobj["id"] | |
| 301 | |
| 302 _convert_stdstreams_to_files(workflowobj) | |
| 303 | |
| 304 if preprocess_only: | |
| 305 return loadingContext, uri | |
| 306 | |
| 307 if loadingContext.do_validate: | |
| 308 schema.validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) | |
| 309 | |
| 310 # None means default behavior (do update) | |
| 311 if loadingContext.do_update in (True, None): | |
| 312 if "cwlVersion" not in metadata: | |
| 313 metadata["cwlVersion"] = cwlVersion | |
| 314 processobj = update.update( | |
| 315 processobj, document_loader, fileuri, loadingContext.enable_dev, metadata) | |
| 316 document_loader.idx[processobj["id"]] = processobj | |
| 317 | |
| 318 if jobobj is not None: | |
| 319 loadingContext.jobdefaults = jobobj | |
| 320 | |
| 321 loadingContext.loader = document_loader | |
| 322 loadingContext.avsc_names = avsc_names | |
| 323 loadingContext.metadata = metadata | |
| 324 | |
| 325 return loadingContext, uri | |
| 326 | |
| 327 | |
| 328 def make_tool(uri, # type: Union[Text, CommentedMap, CommentedSeq] | |
| 329 loadingContext # type: LoadingContext | |
| 330 ): # type: (...) -> Process | |
| 331 """Make a Python CWL object.""" | |
| 332 if loadingContext.loader is None: | |
| 333 raise ValueError("loadingContext must have a loader") | |
| 334 resolveduri, metadata = loadingContext.loader.resolve_ref(uri) | |
| 335 | |
| 336 processobj = None | |
| 337 if isinstance(resolveduri, MutableSequence): | |
| 338 for obj in resolveduri: | |
| 339 if obj['id'].endswith('#main'): | |
| 340 processobj = obj | |
| 341 break | |
| 342 if not processobj: | |
| 343 raise WorkflowException( | |
| 344 u"Tool file contains graph of multiple objects, must specify " | |
| 345 "one of #%s" % ", #".join( | |
| 346 urllib.parse.urldefrag(i["id"])[1] for i in resolveduri | |
| 347 if "id" in i)) | |
| 348 elif isinstance(resolveduri, MutableMapping): | |
| 349 processobj = resolveduri | |
| 350 else: | |
| 351 raise Exception("Must resolve to list or dict") | |
| 352 | |
| 353 tool = loadingContext.construct_tool_object(processobj, loadingContext) | |
| 354 | |
| 355 if loadingContext.jobdefaults: | |
| 356 jobobj = loadingContext.jobdefaults | |
| 357 for inp in tool.tool["inputs"]: | |
| 358 if shortname(inp["id"]) in jobobj: | |
| 359 inp["default"] = jobobj[shortname(inp["id"])] | |
| 360 | |
| 361 return tool | |
| 362 | |
| 363 | |
| 364 def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]] | |
| 365 loadingContext=None # type: Optional[LoadingContext] | |
| 366 ): # type: (...) -> Process | |
| 367 | |
| 368 loadingContext, workflowobj, uri = fetch_document( | |
| 369 argsworkflow, loadingContext) | |
| 370 | |
| 371 loadingContext, uri = resolve_and_validate_document( | |
| 372 loadingContext, workflowobj, uri) | |
| 373 | |
| 374 return make_tool(uri, | |
| 375 loadingContext) | |
| 376 | |
| 377 def resolve_overrides(ov, # type: CommentedMap | |
| 378 ov_uri, # type: Text | |
| 379 baseurl # type: Text | |
| 380 ): # type: (...) -> List[Dict[Text, Any]] | |
| 381 ovloader = Loader(overrides_ctx) | |
| 382 ret, _ = ovloader.resolve_all(ov, baseurl) | |
| 383 if not isinstance(ret, CommentedMap): | |
| 384 raise Exception("Expected CommentedMap, got %s" % type(ret)) | |
| 385 cwl_docloader = get_schema("v1.0")[0] | |
| 386 cwl_docloader.resolve_all(ret, ov_uri) | |
| 387 return cast(List[Dict[Text, Any]], | |
| 388 ret["http://commonwl.org/cwltool#overrides"]) | |
| 389 | |
| 390 def load_overrides(ov, base_url): # type: (Text, Text) -> List[Dict[Text, Any]] | |
| 391 ovloader = Loader(overrides_ctx) | |
| 392 return resolve_overrides(ovloader.fetch(ov), ov, base_url) |
