Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/tests/test_pack.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 import json | |
| 2 from ruamel import yaml | |
| 3 import os | |
| 4 import tempfile | |
| 5 from functools import partial | |
| 6 | |
| 7 from six import StringIO | |
| 8 import pytest | |
| 9 | |
| 10 import cwltool.pack | |
| 11 import cwltool.workflow | |
| 12 from cwltool import load_tool | |
| 13 from cwltool.load_tool import fetch_document, resolve_and_validate_document | |
| 14 from cwltool.main import main, make_relative, print_pack | |
| 15 from cwltool.pathmapper import adjustDirObjs, adjustFileObjs | |
| 16 from cwltool.resolver import tool_resolver | |
| 17 from cwltool.context import LoadingContext | |
| 18 | |
| 19 from .util import get_data, needs_docker | |
| 20 | |
| 21 | |
| 22 def test_pack(): | |
| 23 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/revsort.cwl")) | |
| 24 loadingContext.do_update = False | |
| 25 loadingContext, uri = resolve_and_validate_document( | |
| 26 loadingContext, workflowobj, uri) | |
| 27 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 28 | |
| 29 with open(get_data("tests/wf/expect_packed.cwl")) as packed_file: | |
| 30 expect_packed = yaml.safe_load(packed_file) | |
| 31 | |
| 32 packed = cwltool.pack.pack(loadingContext.loader, processobj, uri, loadingContext.metadata) | |
| 33 adjustFileObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) | |
| 34 adjustDirObjs(packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))) | |
| 35 | |
| 36 assert "$schemas" in packed | |
| 37 assert len(packed["$schemas"]) == len(expect_packed["$schemas"]) | |
| 38 del packed["$schemas"] | |
| 39 del expect_packed["$schemas"] | |
| 40 | |
| 41 assert packed == expect_packed | |
| 42 | |
| 43 | |
| 44 def test_pack_single_tool(): | |
| 45 loadingContext, workflowobj, uri = fetch_document( | |
| 46 get_data("tests/wf/formattest.cwl")) | |
| 47 loadingContext.do_update = False | |
| 48 loadingContext, uri = resolve_and_validate_document( | |
| 49 loadingContext, workflowobj, uri) | |
| 50 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 51 | |
| 52 packed = cwltool.pack.pack(loadingContext.loader, processobj, uri, loadingContext.metadata) | |
| 53 assert "$schemas" in packed | |
| 54 | |
| 55 def test_pack_rewrites(): | |
| 56 rewrites = {} | |
| 57 | |
| 58 loadingContext, workflowobj, uri = fetch_document(get_data("tests/wf/default-wf5.cwl")) | |
| 59 loadingContext.do_update = False | |
| 60 loadingContext, uri = resolve_and_validate_document( | |
| 61 loadingContext, workflowobj, uri) | |
| 62 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 63 | |
| 64 cwltool.pack.pack(loadingContext.loader, processobj, uri, loadingContext.metadata, rewrite_out=rewrites) | |
| 65 | |
| 66 assert len(rewrites) == 6 | |
| 67 | |
| 68 cwl_missing_version_paths = [ | |
| 69 "tests/wf/hello_single_tool.cwl", | |
| 70 "tests/wf/hello-workflow.cwl" | |
| 71 ] | |
| 72 | |
| 73 @pytest.mark.parametrize('cwl_path', cwl_missing_version_paths) | |
| 74 def test_pack_missing_cwlVersion(cwl_path): | |
| 75 """Ensure the generated pack output is not missing the `cwlVersion` in case of single tool workflow and single step workflow.""" | |
| 76 # Testing single tool workflow | |
| 77 loadingContext, workflowobj, uri = fetch_document(get_data(cwl_path)) | |
| 78 loadingContext.do_update = False | |
| 79 loadingContext, uri = resolve_and_validate_document( | |
| 80 loadingContext, workflowobj, uri) | |
| 81 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 82 | |
| 83 # generate pack output dict | |
| 84 packed = json.loads(print_pack(loadingContext.loader, processobj, uri, loadingContext.metadata)) | |
| 85 | |
| 86 assert packed["cwlVersion"] == 'v1.0' | |
| 87 | |
| 88 def test_pack_idempotence_tool(): | |
| 89 """Ensure that pack produces exactly the same document for an already packed CommandLineTool.""" | |
| 90 _pack_idempotently("tests/wf/hello_single_tool.cwl") | |
| 91 | |
| 92 def test_pack_idempotence_workflow(): | |
| 93 """Ensure that pack produces exactly the same document for an already packed workflow.""" | |
| 94 _pack_idempotently("tests/wf/count-lines1-wf.cwl") | |
| 95 | |
| 96 def _pack_idempotently(document): | |
| 97 loadingContext, workflowobj, uri = fetch_document( | |
| 98 get_data(document)) | |
| 99 loadingContext.do_update = False | |
| 100 loadingContext, uri = resolve_and_validate_document( | |
| 101 loadingContext, workflowobj, uri) | |
| 102 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 103 | |
| 104 # generate pack output dict | |
| 105 packed = json.loads(print_pack(loadingContext.loader, processobj, uri, loadingContext.metadata)) | |
| 106 | |
| 107 loadingContext, workflowobj, uri2 = fetch_document(packed) | |
| 108 loadingContext.do_update = False | |
| 109 loadingContext, uri2 = resolve_and_validate_document( | |
| 110 loadingContext, workflowobj, uri) | |
| 111 processobj = loadingContext.loader.resolve_ref(uri2)[0] | |
| 112 double_packed = json.loads(print_pack(loadingContext.loader, processobj, uri2, loadingContext.metadata)) | |
| 113 assert packed == double_packed | |
| 114 | |
| 115 cwl_to_run = [ | |
| 116 ("tests/wf/count-lines1-wf.cwl", | |
| 117 "tests/wf/wc-job.json", | |
| 118 False | |
| 119 ), | |
| 120 ("tests/wf/formattest.cwl", | |
| 121 "tests/wf/formattest-job.json", | |
| 122 True | |
| 123 ), | |
| 124 ] | |
| 125 | |
| 126 @needs_docker | |
| 127 @pytest.mark.parametrize('wf_path,job_path,namespaced', cwl_to_run) | |
| 128 def test_packed_workflow_execution(wf_path, job_path, namespaced, tmpdir): | |
| 129 loadingContext = LoadingContext() | |
| 130 loadingContext.resolver = tool_resolver | |
| 131 loadingContext, workflowobj, uri = fetch_document( | |
| 132 get_data(wf_path), loadingContext) | |
| 133 loadingContext.do_update = False | |
| 134 loadingContext, uri = resolve_and_validate_document( | |
| 135 loadingContext, workflowobj, uri) | |
| 136 processobj = loadingContext.loader.resolve_ref(uri)[0] | |
| 137 packed = json.loads(print_pack(loadingContext.loader, processobj, uri, loadingContext.metadata)) | |
| 138 | |
| 139 assert not namespaced or "$namespaces" in packed | |
| 140 | |
| 141 wf_packed_handle, wf_packed_path = tempfile.mkstemp() | |
| 142 with open(wf_packed_path, 'w') as temp_file: | |
| 143 json.dump(packed, temp_file) | |
| 144 | |
| 145 normal_output = StringIO() | |
| 146 packed_output = StringIO() | |
| 147 | |
| 148 normal_params = ['--outdir', str(tmpdir), get_data(wf_path), get_data(job_path)] | |
| 149 packed_params = ['--outdir', str(tmpdir), '--debug', wf_packed_path, get_data(job_path)] | |
| 150 | |
| 151 assert main(normal_params, stdout=normal_output) == 0 | |
| 152 assert main(packed_params, stdout=packed_output) == 0 | |
| 153 | |
| 154 assert json.loads(packed_output.getvalue()) == json.loads(normal_output.getvalue()) | |
| 155 | |
| 156 os.close(wf_packed_handle) | |
| 157 os.remove(wf_packed_path) |
