Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/tests/test_provenance.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 import json | |
2 import ntpath | |
3 import os | |
4 import posixpath | |
5 import shutil | |
6 import sys | |
7 import tempfile | |
8 from io import open | |
9 try: | |
10 import cPickle as pickle | |
11 except ImportError: | |
12 import pickle | |
13 | |
14 from six.moves import urllib | |
15 | |
16 import arcp | |
17 import pytest | |
18 from rdflib import Graph, Literal, Namespace, URIRef | |
19 from rdflib.namespace import DC, DCTERMS, RDF | |
20 | |
21 import bagit | |
22 # Module to be tested | |
23 from cwltool import load_tool, provenance | |
24 from cwltool.main import main | |
25 from cwltool.resolver import Path | |
26 from cwltool.context import RuntimeContext | |
27 from cwltool.stdfsaccess import StdFsAccess | |
28 | |
29 from .util import get_data, needs_docker, temp_dir, working_directory | |
30 | |
31 # RDF namespaces we'll query for later | |
32 ORE = Namespace("http://www.openarchives.org/ore/terms/") | |
33 PROV = Namespace("http://www.w3.org/ns/prov#") | |
34 RO = Namespace("http://purl.org/wf4ever/ro#") | |
35 WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#") | |
36 WFPROV = Namespace("http://purl.org/wf4ever/wfprov#") | |
37 SCHEMA = Namespace("http://schema.org/") | |
38 CWLPROV = Namespace("https://w3id.org/cwl/prov#") | |
39 OA = Namespace("http://www.w3.org/ns/oa#") | |
40 | |
41 | |
42 @pytest.fixture | |
43 def folder(tmpdir): | |
44 directory = str(tmpdir) | |
45 if os.environ.get("DEBUG"): | |
46 print("%s folder: %s" % (__loader__.fullname, folder)) | |
47 yield directory | |
48 | |
49 if not os.environ.get("DEBUG"): | |
50 shutil.rmtree(directory) | |
51 | |
52 | |
53 def cwltool(folder, *args): | |
54 new_args = ['--provenance', folder] | |
55 new_args.extend(args) | |
56 # Run within a temporary directory to not pollute git checkout | |
57 with temp_dir("cwltool-run") as tmp_dir: | |
58 with working_directory(tmp_dir): | |
59 status = main(new_args) | |
60 assert status == 0, "Failed: cwltool.main(%r)" % (args) | |
61 | |
62 @needs_docker | |
63 def test_hello_workflow(folder): | |
64 cwltool(folder, get_data('tests/wf/hello-workflow.cwl'), "--usermessage", "Hello workflow") | |
65 check_provenance(folder) | |
66 | |
67 @needs_docker | |
68 def test_hello_single_tool(folder): | |
69 cwltool(folder, get_data('tests/wf/hello_single_tool.cwl'), "--message", "Hello tool") | |
70 check_provenance(folder, single_tool=True) | |
71 | |
72 @needs_docker | |
73 def test_revsort_workflow(folder): | |
74 cwltool(folder, get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json')) | |
75 check_output_object(folder) | |
76 check_provenance(folder) | |
77 | |
78 @needs_docker | |
79 def test_nested_workflow(folder): | |
80 cwltool(folder, get_data('tests/wf/nested.cwl')) | |
81 check_provenance(folder, nested=True) | |
82 | |
83 @needs_docker | |
84 def test_secondary_files_implicit(folder, tmpdir): | |
85 file1 = tmpdir.join("foo1.txt") | |
86 file1idx = tmpdir.join("foo1.txt.idx") | |
87 | |
88 with open(str(file1), "w", encoding="ascii") as f: | |
89 f.write(u"foo") | |
90 with open(str(file1idx), "w", encoding="ascii") as f: | |
91 f.write(u"bar") | |
92 | |
93 # secondary will be picked up by .idx | |
94 cwltool(folder, get_data('tests/wf/sec-wf.cwl'), "--file1", str(file1)) | |
95 check_provenance(folder, secondary_files=True) | |
96 check_secondary_files(folder) | |
97 | |
98 @needs_docker | |
99 def test_secondary_files_explicit(folder, tmpdir): | |
100 orig_tempdir = tempfile.tempdir | |
101 tempfile.tempdir = str(tmpdir) | |
102 # Deliberately do NOT have common basename or extension | |
103 file1 = tempfile.mktemp("foo") | |
104 file1idx = tempfile.mktemp("bar") | |
105 | |
106 with open(file1, "w", encoding="ascii") as f: | |
107 f.write(u"foo") | |
108 with open(file1idx, "w", encoding="ascii") as f: | |
109 f.write(u"bar") | |
110 | |
111 # explicit secondaryFiles | |
112 job = {"file1": | |
113 {"class": "File", | |
114 "path": file1, | |
115 "basename": "foo1.txt", | |
116 "secondaryFiles": [ | |
117 { | |
118 "class": "File", | |
119 "path": file1idx, | |
120 "basename": "foo1.txt.idx", | |
121 } | |
122 ] | |
123 } | |
124 } | |
125 jobJson = tempfile.mktemp("job.json") | |
126 with open(jobJson, "wb") as fp: | |
127 j = json.dumps(job, ensure_ascii=True) | |
128 fp.write(j.encode("ascii")) | |
129 | |
130 cwltool(folder, get_data('tests/wf/sec-wf.cwl'), jobJson) | |
131 check_provenance(folder, secondary_files=True) | |
132 check_secondary_files(folder) | |
133 tempfile.tempdir = orig_tempdir | |
134 | |
135 @needs_docker | |
136 def test_secondary_files_output(folder): | |
137 # secondary will be picked up by .idx | |
138 cwltool(folder, get_data('tests/wf/sec-wf-out.cwl')) | |
139 check_provenance(folder, secondary_files=True) | |
140 # Skipped, not the same secondary files as above | |
141 #self.check_secondary_files() | |
142 | |
143 @needs_docker | |
144 def test_directory_workflow(folder, tmpdir): | |
145 dir2 = tmpdir.join("dir2") | |
146 os.makedirs(str(dir2)) | |
147 sha1 = { | |
148 # Expected hashes of ASCII letters (no linefeed) | |
149 # as returned from: | |
150 # for x in a b c ; do echo -n $x | sha1sum ; done | |
151 "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", | |
152 "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", | |
153 "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", | |
154 } | |
155 for x in u"abc": | |
156 # Make test files with predictable hashes | |
157 with open(str(dir2.join(x)), "w", encoding="ascii") as f: | |
158 f.write(x) | |
159 | |
160 cwltool(folder, get_data('tests/wf/directory.cwl'), "--dir", str(dir2)) | |
161 check_provenance(folder, directory=True) | |
162 | |
163 # Output should include ls stdout of filenames a b c on each line | |
164 file_list = os.path.join( | |
165 folder, "data", | |
166 # checksum as returned from: | |
167 # echo -e "a\nb\nc" | sha1sum | |
168 # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - | |
169 "3c", | |
170 "3ca69e8d6c234a469d16ac28a4a658c92267c423") | |
171 assert os.path.isfile(file_list) | |
172 | |
173 # Input files should be captured by hash value, | |
174 # even if they were inside a class: Directory | |
175 for (l, l_hash) in sha1.items(): | |
176 prefix = l_hash[:2] # first 2 letters | |
177 p = os.path.join(folder, "data", prefix, l_hash) | |
178 assert os.path.isfile(p), "Could not find %s as %s" % (l, p) | |
179 | |
180 def check_output_object(base_path): | |
181 output_obj = os.path.join(base_path, "workflow", "primary-output.json") | |
182 compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284" | |
183 compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284" | |
184 with open(output_obj) as fp: | |
185 out_json = json.load(fp) | |
186 f1 = out_json["sorted_output"] | |
187 assert f1["checksum"] == compare_checksum | |
188 assert f1["location"] == compare_location | |
189 | |
190 | |
191 def check_secondary_files(base_path): | |
192 foo_data = os.path.join( | |
193 base_path, "data", | |
194 # checksum as returned from: | |
195 # $ echo -n foo | sha1sum | |
196 # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - | |
197 "0b", | |
198 "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") | |
199 bar_data = os.path.join( | |
200 base_path, "data", "62", "62cdb7020ff920e5aa642c3d4066950dd1f01f4d") | |
201 assert os.path.isfile(foo_data), "Did not capture file.txt 'foo'" | |
202 assert os.path.isfile(bar_data), "Did not capture secondary file.txt.idx 'bar" | |
203 | |
204 primary_job = os.path.join(base_path, "workflow", "primary-job.json") | |
205 with open(primary_job) as fp: | |
206 job_json = json.load(fp) | |
207 # TODO: Verify secondaryFile in primary-job.json | |
208 f1 = job_json["file1"] | |
209 assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" | |
210 assert f1["basename"] == "foo1.txt" | |
211 | |
212 secondaries = f1["secondaryFiles"] | |
213 assert secondaries | |
214 f1idx = secondaries[0] | |
215 assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d" | |
216 assert f1idx["basename"], "foo1.txt.idx" | |
217 | |
218 def check_provenance(base_path, nested=False, single_tool=False, directory=False, | |
219 secondary_files=False): | |
220 check_folders(base_path) | |
221 check_bagit(base_path) | |
222 check_ro(base_path, nested=nested) | |
223 check_prov(base_path, nested=nested, single_tool=single_tool, directory=directory, | |
224 secondary_files=secondary_files) | |
225 | |
226 def check_folders(base_path): | |
227 required_folders = [ | |
228 "data", "snapshot", "workflow", "metadata", os.path.join("metadata", "provenance")] | |
229 | |
230 for folder in required_folders: | |
231 assert os.path.isdir(os.path.join(base_path, folder)) | |
232 | |
233 def check_bagit(base_path): | |
234 # check bagit structure | |
235 required_files = [ | |
236 "bagit.txt", "bag-info.txt", "manifest-sha1.txt", | |
237 "tagmanifest-sha1.txt", "tagmanifest-sha256.txt"] | |
238 | |
239 for basename in required_files: | |
240 file_path = os.path.join(base_path, basename) | |
241 assert os.path.isfile(file_path) | |
242 | |
243 bag = bagit.Bag(base_path) | |
244 assert bag.has_oxum() | |
245 (only_manifest, only_fs) = bag.compare_manifests_with_fs() | |
246 assert not list(only_manifest), "Some files only in manifest" | |
247 assert not list(only_fs), "Some files only on file system" | |
248 missing_tagfiles = bag.missing_optional_tagfiles() | |
249 assert not list(missing_tagfiles), "Some files only in tagmanifest" | |
250 bag.validate() | |
251 # TODO: Check other bag-info attributes | |
252 assert arcp.is_arcp_uri(bag.info.get("External-Identifier")) | |
253 | |
254 def find_arcp(base_path): | |
255 # First try to find External-Identifier | |
256 bag = bagit.Bag(base_path) | |
257 ext_id = bag.info.get("External-Identifier") | |
258 if arcp.is_arcp_uri(ext_id): | |
259 return ext_id | |
260 raise Exception("Can't find External-Identifier") | |
261 | |
262 def _arcp2file(base_path, uri): | |
263 parsed = arcp.parse_arcp(uri) | |
264 # arcp URIs, ensure they are local to our RO | |
265 assert parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid,\ | |
266 'arcp URI must be local to the research object' | |
267 | |
268 path = parsed.path[1:] # Strip first / | |
269 # Convert to local path, in case it uses \ on Windows | |
270 lpath = str(Path(path)) | |
271 return os.path.join(base_path, lpath) | |
272 | |
273 def check_ro(base_path, nested=False): | |
274 manifest_file = os.path.join(base_path, "metadata", "manifest.json") | |
275 assert os.path.isfile(manifest_file), "Can't find " + manifest_file | |
276 arcp_root = find_arcp(base_path) | |
277 base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json") | |
278 g = Graph() | |
279 | |
280 # Avoid resolving JSON-LD context https://w3id.org/bundle/context | |
281 # so this test works offline | |
282 context = Path(get_data("tests/bundle-context.jsonld")).as_uri() | |
283 with open(manifest_file, "r", encoding="UTF-8") as f: | |
284 jsonld = f.read() | |
285 # replace with file:/// URI | |
286 jsonld = jsonld.replace("https://w3id.org/bundle/context", context) | |
287 g.parse(data=jsonld, format="json-ld", publicID=base) | |
288 if os.environ.get("DEBUG"): | |
289 print("Parsed manifest:\n\n") | |
290 g.serialize(sys.stdout, format="ttl") | |
291 ro = None | |
292 | |
293 for ro in g.subjects(ORE.isDescribedBy, URIRef(base)): | |
294 break | |
295 assert ro is not None, "Can't find RO with ore:isDescribedBy" | |
296 | |
297 profile = None | |
298 for dc in g.objects(ro, DCTERMS.conformsTo): | |
299 profile = dc | |
300 break | |
301 assert profile is not None, "Can't find profile with dct:conformsTo" | |
302 assert profile == URIRef(provenance.CWLPROV_VERSION),\ | |
303 "Unexpected cwlprov version " + profile | |
304 | |
305 paths = [] | |
306 externals = [] | |
307 for aggregate in g.objects(ro, ORE.aggregates): | |
308 if not arcp.is_arcp_uri(aggregate): | |
309 externals.append(aggregate) | |
310 # Won't check external URIs existence here | |
311 # TODO: Check they are not relative! | |
312 continue | |
313 lfile = _arcp2file(base_path, aggregate) | |
314 paths.append(os.path.relpath(lfile, base_path)) | |
315 assert os.path.isfile(lfile), "Can't find aggregated " + lfile | |
316 | |
317 assert paths, "Didn't find any arcp aggregates" | |
318 assert externals, "Didn't find any data URIs" | |
319 | |
320 for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]: | |
321 f = "metadata/provenance/primary.cwlprov.%s" % ext | |
322 assert f in paths, "provenance file missing " + f | |
323 | |
324 for f in ["workflow/primary-job.json", "workflow/packed.cwl", "workflow/primary-output.json"]: | |
325 assert f in paths, "workflow file missing " + f | |
326 # Can't test snapshot/ files directly as their name varies | |
327 | |
328 # TODO: check urn:hash::sha1 thingies | |
329 # TODO: Check OA annotations | |
330 | |
331 packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl") | |
332 primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json") | |
333 primary_prov_nt = urllib.parse.urljoin(arcp_root, "/metadata/provenance/primary.cwlprov.nt") | |
334 uuid = arcp.parse_arcp(arcp_root).uuid | |
335 | |
336 highlights = set(g.subjects(OA.motivatedBy, OA.highlighting)) | |
337 assert highlights, "Didn't find highlights" | |
338 for h in highlights: | |
339 assert (h, OA.hasTarget, URIRef(packed)) in g | |
340 | |
341 describes = set(g.subjects(OA.motivatedBy, OA.describing)) | |
342 for d in describes: | |
343 assert (d, OA.hasBody, URIRef(arcp_root)) in g | |
344 assert (d, OA.hasTarget, URIRef(uuid.urn)) in g | |
345 | |
346 linked = set(g.subjects(OA.motivatedBy, OA.linking)) | |
347 for l in linked: | |
348 assert (l, OA.hasBody, URIRef(packed)) in g | |
349 assert (l, OA.hasBody, URIRef(primary_job)) in g | |
350 assert (l, OA.hasTarget, URIRef(uuid.urn)) in g | |
351 | |
352 has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt))) | |
353 for p in has_provenance: | |
354 assert (p, OA.hasTarget, URIRef(uuid.urn)) in g | |
355 assert (p, OA.motivatedBy, PROV.has_provenance) in g | |
356 # Check all prov elements are listed | |
357 formats = set() | |
358 for prov in g.objects(p, OA.hasBody): | |
359 assert (prov, DCTERMS.conformsTo, URIRef(provenance.CWLPROV_VERSION)) in g | |
360 # NOTE: DC.format is a Namespace method and does not resolve like other terms | |
361 formats.update(set(g.objects(prov, DC["format"]))) | |
362 assert formats, "Could not find media types" | |
363 expected = set(Literal(f) for f in ( | |
364 "application/json", | |
365 "application/ld+json", | |
366 "application/n-triples", | |
367 'text/provenance-notation; charset="UTF-8"', | |
368 'text/turtle; charset="UTF-8"', | |
369 "application/xml" | |
370 )) | |
371 assert formats == expected, "Did not match expected PROV media types" | |
372 | |
373 if nested: | |
374 # Check for additional PROVs | |
375 # Let's try to find the other wf run ID | |
376 otherRuns = set() | |
377 for p in g.subjects(OA.motivatedBy, PROV.has_provenance): | |
378 if (p, OA.hasTarget, URIRef(uuid.urn)) in g: | |
379 continue | |
380 otherRuns.update(set(g.objects(p, OA.hasTarget))) | |
381 assert otherRuns, "Could not find nested workflow run prov annotations" | |
382 | |
383 def check_prov(base_path, nested=False, single_tool=False, directory=False, | |
384 secondary_files=False): | |
385 prov_file = os.path.join(base_path, "metadata", "provenance", "primary.cwlprov.nt") | |
386 assert os.path.isfile(prov_file), "Can't find " + prov_file | |
387 arcp_root = find_arcp(base_path) | |
388 # Note: We don't need to include metadata/provnance in base URI | |
389 # as .nt always use absolute URIs | |
390 g = Graph() | |
391 with open(prov_file, "rb") as f: | |
392 g.parse(file=f, format="nt", publicID=arcp_root) | |
393 if os.environ.get("DEBUG"): | |
394 print("Parsed %s:\n\n" % prov_file) | |
395 g.serialize(sys.stdout, format="ttl") | |
396 runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun)) | |
397 | |
398 # master workflow run URI (as urn:uuid:) should correspond to arcp uuid part | |
399 uuid = arcp.parse_arcp(arcp_root).uuid | |
400 master_run = URIRef(uuid.urn) | |
401 assert master_run in runs, "Can't find run %s in %s" % (master_run, runs) | |
402 # TODO: we should not need to parse arcp, but follow | |
403 # the has_provenance annotations in manifest.json instead | |
404 | |
405 # run should have been started by a wf engine | |
406 | |
407 engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine)) | |
408 assert engines, "Could not find WorkflowEngine" | |
409 assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines | |
410 engine = engines.pop() | |
411 | |
412 assert (master_run, PROV.wasAssociatedWith, engine) in g, "Wf run not associated with wf engine" | |
413 assert (engine, RDF.type, PROV.SoftwareAgent) in g, "Engine not declared as SoftwareAgent" | |
414 | |
415 if single_tool: | |
416 activities = set(g.subjects(RDF.type, PROV.Activity)) | |
417 assert len(activities) == 1, "Too many activities: %s" % activities | |
418 # single tool exec, there should be no other activities | |
419 # than the tool run | |
420 # (NOTE: the WorkflowEngine is also activity, but not declared explicitly) | |
421 else: | |
422 # Check all process runs were started by the master worklow | |
423 stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun)) | |
424 # Although semantically a WorkflowEngine is also a ProcessRun, | |
425 # we don't declare that, | |
426 # thus only the step activities should be in this set. | |
427 assert master_run not in stepActivities | |
428 assert stepActivities, "No steps executed in workflow" | |
429 for step in stepActivities: | |
430 # Let's check it was started by the master_run. Unfortunately, unlike PROV-N | |
431 # in PROV-O RDF we have to check through the n-ary qualifiedStart relation | |
432 starts = set(g.objects(step, PROV.qualifiedStart)) | |
433 assert starts, "Could not find qualifiedStart of step %s" % step | |
434 assert len(starts) == 1, "Too many qualifiedStart for step %s" % step | |
435 start = starts.pop() | |
436 assert (start, PROV.hadActivity, master_run) in g,\ | |
437 "Step activity not started by master activity" | |
438 # Tip: Any nested workflow step executions should not be in this prov file, | |
439 # but in separate file | |
440 if nested: | |
441 # Find some cwlprov.nt the nested workflow is described in | |
442 prov_ids = set(g.objects(predicate=PROV.has_provenance)) | |
443 # FIXME: The above is a bit naive and does not check the subject is | |
444 # one of the steps -- OK for now as this is the only case of prov:has_provenance | |
445 assert prov_ids, "Could not find prov:has_provenance from nested workflow" | |
446 | |
447 nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")] | |
448 # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename | |
449 assert nt_uris, "Could not find *.cwlprov.nt" | |
450 # Load into new graph | |
451 g2 = Graph() | |
452 nt_uri = nt_uris.pop() | |
453 with open(_arcp2file(base_path, nt_uri), "rb") as f: | |
454 g2.parse(file=f, format="nt", publicID=nt_uri) | |
455 # TODO: Check g2 statements that it's the same UUID activity inside | |
456 # as in the outer step | |
457 if directory: | |
458 directories = set(g.subjects(RDF.type, RO.Folder)) | |
459 assert directories | |
460 | |
461 for d in directories: | |
462 assert (d, RDF.type, PROV.Dictionary) in g | |
463 assert (d, RDF.type, PROV.Collection) in g | |
464 assert(d, RDF.type, PROV.Entity) in g | |
465 | |
466 files = set() | |
467 for entry in g.objects(d, PROV.hadDictionaryMember): | |
468 assert (entry, RDF.type, PROV.KeyEntityPair) in g | |
469 # We don't check what that filename is here | |
470 assert set(g.objects(entry, PROV.pairKey)) | |
471 | |
472 # RO:Folder aspect | |
473 assert set(g.objects(entry, RO.entryName)) | |
474 assert (d, ORE.aggregates, entry) in g | |
475 assert (entry, RDF.type, RO.FolderEntry) in g | |
476 assert (entry, RDF.type, ORE.Proxy) in g | |
477 assert (entry, ORE.proxyIn, d) in g | |
478 assert (entry, ORE.proxyIn, d) in g | |
479 | |
480 # Which file? | |
481 entities = set(g.objects(entry, PROV.pairEntity)) | |
482 assert entities | |
483 f = entities.pop() | |
484 files.add(f) | |
485 assert (entry, ORE.proxyFor, f) in g | |
486 assert (f, RDF.type, PROV.Entity) in g | |
487 | |
488 if not files: | |
489 assert (d, RDF.type, PROV.EmptyCollection) in g | |
490 assert (d, RDF.type, PROV.EmptyDictionary) in g | |
491 if secondary_files: | |
492 derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile)) | |
493 assert derivations | |
494 for der in derivations: | |
495 sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop() | |
496 prim = set(g.objects(der, PROV.entity)).pop() | |
497 | |
498 # UUID specializes a hash checksum | |
499 assert set(g.objects(sec, PROV.specializationOf)) | |
500 # extensions etc. | |
501 sec_basename = set(g.objects(sec, CWLPROV.basename)).pop() | |
502 sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop() | |
503 sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop() | |
504 assert str(sec_basename) == "%s%s" % (sec_nameroot, sec_nameext) | |
505 # TODO: Check hash data file exist in RO | |
506 | |
507 # The primary entity should have the same, but different values | |
508 assert set(g.objects(prim, PROV.specializationOf)) | |
509 prim_basename = set(g.objects(prim, CWLPROV.basename)).pop() | |
510 prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop() | |
511 prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop() | |
512 assert str(prim_basename) == "%s%s" % (prim_nameroot, prim_nameext) | |
513 | |
514 | |
515 @pytest.fixture | |
516 def research_object(): | |
517 re_ob = provenance.ResearchObject(StdFsAccess('')) | |
518 yield re_ob | |
519 re_ob.close() | |
520 | |
521 def test_absolute_path_fails(research_object): | |
522 with pytest.raises(ValueError): | |
523 research_object.write_bag_file("/absolute/path/fails") | |
524 | |
525 def test_climboutfails(research_object): | |
526 with pytest.raises(ValueError): | |
527 research_object.write_bag_file("../../outside-ro") | |
528 | |
529 def test_writable_string(research_object): | |
530 with research_object.write_bag_file("file.txt") as file: | |
531 assert file.writable() | |
532 file.write(u"Hello\n") | |
533 # TODO: Check Windows does not modify \n to \r\n here | |
534 | |
535 sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt") | |
536 assert os.path.isfile(sha1) | |
537 | |
538 with open(sha1, "r", encoding="UTF-8") as sha_file: | |
539 stripped_sha = sha_file.readline().strip() | |
540 assert stripped_sha.endswith("file.txt") | |
541 #stain@biggie:~/src/cwltool$ echo Hello | sha1sum | |
542 #1d229271928d3f9e2bb0375bd6ce5db6c6d348d9 - | |
543 assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9") | |
544 | |
545 sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt") | |
546 assert os.path.isfile(sha256) | |
547 | |
548 with open(sha256, "r", encoding="UTF-8") as sha_file: | |
549 stripped_sha = sha_file.readline().strip() | |
550 | |
551 assert stripped_sha.endswith("file.txt") | |
552 #stain@biggie:~/src/cwltool$ echo Hello | sha256sum | |
553 #66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18 - | |
554 assert stripped_sha.startswith("66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18") | |
555 | |
556 sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt") | |
557 assert os.path.isfile(sha512) | |
558 | |
559 def test_writable_unicode_string(research_object): | |
560 with research_object.write_bag_file("file.txt") as file: | |
561 assert file.writable() | |
562 file.write(u"Here is a snowman: \u2603 \n") | |
563 | |
564 def test_writable_bytes(research_object): | |
565 string = u"Here is a snowman: \u2603 \n".encode("UTF-8") | |
566 with research_object.write_bag_file("file.txt", encoding=None) as file: | |
567 file.write(string) | |
568 | |
569 def test_data(research_object): | |
570 with research_object.write_bag_file("data/file.txt") as file: | |
571 assert file.writable() | |
572 file.write(u"Hello\n") | |
573 # TODO: Check Windows does not modify \n to \r\n here | |
574 | |
575 # Because this is under data/ it should add to manifest | |
576 # rather than tagmanifest | |
577 sha1 = os.path.join(research_object.folder, "manifest-sha1.txt") | |
578 assert os.path.isfile(sha1) | |
579 with open(sha1, "r", encoding="UTF-8") as file: | |
580 stripped_sha = file.readline().strip() | |
581 assert stripped_sha.endswith("data/file.txt") | |
582 | |
583 def test_not_seekable(research_object): | |
584 with research_object.write_bag_file("file.txt") as file: | |
585 assert not file.seekable() | |
586 with pytest.raises(IOError): | |
587 file.seek(0) | |
588 | |
589 def test_not_readable(research_object): | |
590 with research_object.write_bag_file("file.txt") as file: | |
591 assert not file.readable() | |
592 with pytest.raises(IOError): | |
593 file.read() | |
594 | |
595 def test_truncate_fails(research_object): | |
596 with research_object.write_bag_file("file.txt") as file: | |
597 file.write(u"Hello there") | |
598 file.truncate() # OK as we're always at end | |
599 # Will fail because the checksum can't rewind | |
600 with pytest.raises(IOError): | |
601 file.truncate(0) | |
602 | |
603 | |
604 mod_validness = [ | |
605 # Taken from "Some sample ORCID iDs" on | |
606 # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier | |
607 ("0000-0002-1825-0097", True), | |
608 ("0000-0001-5109-3700", True), | |
609 ("0000-0002-1694-233X", True), | |
610 # dashes optional | |
611 ("0000000218250097", True), | |
612 ("0000000151093700", True), | |
613 ("000000021694233X", True), | |
614 # do not fail on missing digits | |
615 ("0002-1694-233X", True), | |
616 # Swap check-digits around to force error | |
617 ("0000-0002-1825-009X", False), | |
618 ("0000-0001-5109-3707", False), | |
619 ("0000-0002-1694-2330", False) | |
620 ] | |
621 | |
622 @pytest.mark.parametrize('mod11,valid', mod_validness) | |
623 def test_check_mod_11_2(mod11, valid): | |
624 assert provenance._check_mod_11_2(mod11) == valid | |
625 | |
626 | |
627 orcid_uris = [ | |
628 # https://orcid.org/ (Expected form) | |
629 ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), | |
630 # orcid.org | |
631 ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), | |
632 # just the number | |
633 ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"), | |
634 # lower-case X is OK (and fixed) | |
635 ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"), | |
636 # upper-case ORCID.ORG is OK.. (and fixed) | |
637 ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), | |
638 # Unicode string (Python 2) | |
639 (u"https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X") | |
640 ] | |
641 | |
642 @pytest.mark.parametrize('orcid,expected', orcid_uris) | |
643 def test_valid_orcid(orcid, expected): | |
644 assert provenance._valid_orcid(orcid) == expected | |
645 | |
646 | |
647 invalid_orcids = [ | |
648 # missing digit fails (even if checksum is correct) | |
649 "0002-1694-2332", | |
650 # Wrong checkdigit fails | |
651 "https://orcid.org/0000-0002-1694-2332", | |
652 "0000-0002-1694-2332", | |
653 # Missing dashes fails (although that's OK for checksum) | |
654 "https://orcid.org/000000021694233X", | |
655 "000000021694233X", | |
656 # Wrong hostname fails | |
657 "https://example.org/0000-0002-1694-233X", | |
658 # Wrong protocol fails | |
659 "ftp://orcid.org/0000-0002-1694-233X", | |
660 # Trying to be clever fails (no URL parsing!) | |
661 "https://orcid.org:443/0000-0002-1694-233X", | |
662 "http://orcid.org:80/0000-0002-1694-233X", | |
663 # Empty string is not really valid | |
664 "" | |
665 ] | |
666 | |
667 @pytest.mark.parametrize('orcid', invalid_orcids) | |
668 def test_invalid_orcid(orcid): | |
669 with pytest.raises(ValueError): | |
670 provenance._valid_orcid(orcid) | |
671 | |
672 def test_whoami(): | |
673 username, fullname = provenance._whoami() | |
674 assert username and isinstance(username, str) | |
675 assert fullname and isinstance(fullname, str) | |
676 | |
677 def test_research_object(): | |
678 # TODO: Test ResearchObject methods | |
679 pass | |
680 | |
681 # Reasearch object may need to be pickled (for Toil) | |
682 def test_research_object_picklability(research_object): | |
683 assert pickle.dumps(research_object) is not None |