Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/tests/test_load_tool.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 from cwltool.load_tool import load_tool | |
| 2 from cwltool.context import LoadingContext, RuntimeContext | |
| 3 from cwltool.errors import WorkflowException | |
| 4 from cwltool.update import INTERNAL_VERSION | |
| 5 import pytest | |
| 6 from .util import (get_data, get_main_output, | |
| 7 get_windows_safe_factory, | |
| 8 needs_docker, working_directory, | |
| 9 needs_singularity, temp_dir, | |
| 10 windows_needs_docker) | |
| 11 from cwltool.resolver import Path, resolve_local | |
| 12 from .test_fetch import norm | |
| 13 | |
| 14 @windows_needs_docker | |
| 15 def test_check_version(): | |
| 16 """ | |
| 17 It is permitted to load without updating, but not execute. | |
| 18 | |
| 19 Attempting to execute without updating to the internal version should raise an error. | |
| 20 """ | |
| 21 joborder = {"inp": "abc"} | |
| 22 loadingContext = LoadingContext({"do_update": True}) | |
| 23 tool = load_tool(get_data("tests/echo.cwl"), loadingContext) | |
| 24 for j in tool.job(joborder, None, RuntimeContext()): | |
| 25 pass | |
| 26 | |
| 27 loadingContext = LoadingContext({"do_update": False}) | |
| 28 tool = load_tool(get_data("tests/echo.cwl"), loadingContext) | |
| 29 with pytest.raises(WorkflowException): | |
| 30 for j in tool.job(joborder, None, RuntimeContext()): | |
| 31 pass | |
| 32 | |
| 33 def test_use_metadata(): | |
| 34 """Use the version from loadingContext.metadata if cwlVersion isn't present in the document.""" | |
| 35 loadingContext = LoadingContext({"do_update": False}) | |
| 36 tool = load_tool(get_data("tests/echo.cwl"), loadingContext) | |
| 37 | |
| 38 loadingContext = LoadingContext() | |
| 39 loadingContext.metadata = tool.metadata | |
| 40 tooldata = tool.tool.copy() | |
| 41 del tooldata["cwlVersion"] | |
| 42 tool2 = load_tool(tooldata, loadingContext) | |
| 43 | |
| 44 def test_checklink_outputSource(): | |
| 45 """Is outputSource resolved correctly independent of value of do_validate.""" | |
| 46 outsrc = norm(Path(get_data("tests/wf/1st-workflow.cwl")).as_uri())+"#argument/classfile" | |
| 47 | |
| 48 loadingContext = LoadingContext({"do_validate": True}) | |
| 49 tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext) | |
| 50 assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc | |
| 51 | |
| 52 loadingContext = LoadingContext({"do_validate": False}) | |
| 53 tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext) | |
| 54 assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc | |
| 55 | |
| 56 def test_load_graph_fragment(): | |
| 57 """Reloading from a dictionary without a cwlVersion.""" | |
| 58 loadingContext = LoadingContext() | |
| 59 uri = Path(get_data("tests/wf/scatter-wf4.cwl")).as_uri()+"#main" | |
| 60 tool = load_tool(uri, loadingContext) | |
| 61 | |
| 62 rs, metadata = tool.doc_loader.resolve_ref(uri) | |
| 63 # Reload from a dict (in 'rs'), not a URI. The dict is a fragment | |
| 64 # of original document and doesn't have cwlVersion set, so test | |
| 65 # that it correctly looks up the root document to get the | |
| 66 # cwlVersion. | |
| 67 tool = load_tool(tool.tool, loadingContext) | |
| 68 assert tool.metadata["cwlVersion"] == INTERNAL_VERSION |
