Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/cwltool/tests/test_load_tool.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/cwltool/tests/test_load_tool.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,68 @@ +from cwltool.load_tool import load_tool +from cwltool.context import LoadingContext, RuntimeContext +from cwltool.errors import WorkflowException +from cwltool.update import INTERNAL_VERSION +import pytest +from .util import (get_data, get_main_output, + get_windows_safe_factory, + needs_docker, working_directory, + needs_singularity, temp_dir, + windows_needs_docker) +from cwltool.resolver import Path, resolve_local +from .test_fetch import norm + +@windows_needs_docker +def test_check_version(): + """ + It is permitted to load without updating, but not execute. + + Attempting to execute without updating to the internal version should raise an error. + """ + joborder = {"inp": "abc"} + loadingContext = LoadingContext({"do_update": True}) + tool = load_tool(get_data("tests/echo.cwl"), loadingContext) + for j in tool.job(joborder, None, RuntimeContext()): + pass + + loadingContext = LoadingContext({"do_update": False}) + tool = load_tool(get_data("tests/echo.cwl"), loadingContext) + with pytest.raises(WorkflowException): + for j in tool.job(joborder, None, RuntimeContext()): + pass + +def test_use_metadata(): + """Use the version from loadingContext.metadata if cwlVersion isn't present in the document.""" + loadingContext = LoadingContext({"do_update": False}) + tool = load_tool(get_data("tests/echo.cwl"), loadingContext) + + loadingContext = LoadingContext() + loadingContext.metadata = tool.metadata + tooldata = tool.tool.copy() + del tooldata["cwlVersion"] + tool2 = load_tool(tooldata, loadingContext) + +def test_checklink_outputSource(): + """Is outputSource resolved correctly independent of value of do_validate.""" + outsrc = norm(Path(get_data("tests/wf/1st-workflow.cwl")).as_uri())+"#argument/classfile" + + loadingContext = LoadingContext({"do_validate": True}) + tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext) + assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc + + loadingContext = LoadingContext({"do_validate": False}) + tool = load_tool(get_data("tests/wf/1st-workflow.cwl"), loadingContext) + assert norm(tool.tool["outputs"][0]["outputSource"]) == outsrc + +def test_load_graph_fragment(): + """Reloading from a dictionary without a cwlVersion.""" + loadingContext = LoadingContext() + uri = Path(get_data("tests/wf/scatter-wf4.cwl")).as_uri()+"#main" + tool = load_tool(uri, loadingContext) + + rs, metadata = tool.doc_loader.resolve_ref(uri) + # Reload from a dict (in 'rs'), not a URI. The dict is a fragment + # of original document and doesn't have cwlVersion set, so test + # that it correctly looks up the root document to get the + # cwlVersion. + tool = load_tool(tool.tool, loadingContext) + assert tool.metadata["cwlVersion"] == INTERNAL_VERSION