comparison planemo/lib/python3.7/site-packages/cwltool/tests/test_provenance.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 import json
2 import ntpath
3 import os
4 import posixpath
5 import shutil
6 import sys
7 import tempfile
8 from io import open
9 try:
10 import cPickle as pickle
11 except ImportError:
12 import pickle
13
14 from six.moves import urllib
15
16 import arcp
17 import pytest
18 from rdflib import Graph, Literal, Namespace, URIRef
19 from rdflib.namespace import DC, DCTERMS, RDF
20
21 import bagit
22 # Module to be tested
23 from cwltool import load_tool, provenance
24 from cwltool.main import main
25 from cwltool.resolver import Path
26 from cwltool.context import RuntimeContext
27 from cwltool.stdfsaccess import StdFsAccess
28
29 from .util import get_data, needs_docker, temp_dir, working_directory
30
31 # RDF namespaces we'll query for later
32 ORE = Namespace("http://www.openarchives.org/ore/terms/")
33 PROV = Namespace("http://www.w3.org/ns/prov#")
34 RO = Namespace("http://purl.org/wf4ever/ro#")
35 WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#")
36 WFPROV = Namespace("http://purl.org/wf4ever/wfprov#")
37 SCHEMA = Namespace("http://schema.org/")
38 CWLPROV = Namespace("https://w3id.org/cwl/prov#")
39 OA = Namespace("http://www.w3.org/ns/oa#")
40
41
42 @pytest.fixture
43 def folder(tmpdir):
44 directory = str(tmpdir)
45 if os.environ.get("DEBUG"):
46 print("%s folder: %s" % (__loader__.fullname, folder))
47 yield directory
48
49 if not os.environ.get("DEBUG"):
50 shutil.rmtree(directory)
51
52
53 def cwltool(folder, *args):
54 new_args = ['--provenance', folder]
55 new_args.extend(args)
56 # Run within a temporary directory to not pollute git checkout
57 with temp_dir("cwltool-run") as tmp_dir:
58 with working_directory(tmp_dir):
59 status = main(new_args)
60 assert status == 0, "Failed: cwltool.main(%r)" % (args)
61
62 @needs_docker
63 def test_hello_workflow(folder):
64 cwltool(folder, get_data('tests/wf/hello-workflow.cwl'), "--usermessage", "Hello workflow")
65 check_provenance(folder)
66
67 @needs_docker
68 def test_hello_single_tool(folder):
69 cwltool(folder, get_data('tests/wf/hello_single_tool.cwl'), "--message", "Hello tool")
70 check_provenance(folder, single_tool=True)
71
72 @needs_docker
73 def test_revsort_workflow(folder):
74 cwltool(folder, get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json'))
75 check_output_object(folder)
76 check_provenance(folder)
77
78 @needs_docker
79 def test_nested_workflow(folder):
80 cwltool(folder, get_data('tests/wf/nested.cwl'))
81 check_provenance(folder, nested=True)
82
83 @needs_docker
84 def test_secondary_files_implicit(folder, tmpdir):
85 file1 = tmpdir.join("foo1.txt")
86 file1idx = tmpdir.join("foo1.txt.idx")
87
88 with open(str(file1), "w", encoding="ascii") as f:
89 f.write(u"foo")
90 with open(str(file1idx), "w", encoding="ascii") as f:
91 f.write(u"bar")
92
93 # secondary will be picked up by .idx
94 cwltool(folder, get_data('tests/wf/sec-wf.cwl'), "--file1", str(file1))
95 check_provenance(folder, secondary_files=True)
96 check_secondary_files(folder)
97
98 @needs_docker
99 def test_secondary_files_explicit(folder, tmpdir):
100 orig_tempdir = tempfile.tempdir
101 tempfile.tempdir = str(tmpdir)
102 # Deliberately do NOT have common basename or extension
103 file1 = tempfile.mktemp("foo")
104 file1idx = tempfile.mktemp("bar")
105
106 with open(file1, "w", encoding="ascii") as f:
107 f.write(u"foo")
108 with open(file1idx, "w", encoding="ascii") as f:
109 f.write(u"bar")
110
111 # explicit secondaryFiles
112 job = {"file1":
113 {"class": "File",
114 "path": file1,
115 "basename": "foo1.txt",
116 "secondaryFiles": [
117 {
118 "class": "File",
119 "path": file1idx,
120 "basename": "foo1.txt.idx",
121 }
122 ]
123 }
124 }
125 jobJson = tempfile.mktemp("job.json")
126 with open(jobJson, "wb") as fp:
127 j = json.dumps(job, ensure_ascii=True)
128 fp.write(j.encode("ascii"))
129
130 cwltool(folder, get_data('tests/wf/sec-wf.cwl'), jobJson)
131 check_provenance(folder, secondary_files=True)
132 check_secondary_files(folder)
133 tempfile.tempdir = orig_tempdir
134
135 @needs_docker
136 def test_secondary_files_output(folder):
137 # secondary will be picked up by .idx
138 cwltool(folder, get_data('tests/wf/sec-wf-out.cwl'))
139 check_provenance(folder, secondary_files=True)
140 # Skipped, not the same secondary files as above
141 #self.check_secondary_files()
142
143 @needs_docker
144 def test_directory_workflow(folder, tmpdir):
145 dir2 = tmpdir.join("dir2")
146 os.makedirs(str(dir2))
147 sha1 = {
148 # Expected hashes of ASCII letters (no linefeed)
149 # as returned from:
150 # for x in a b c ; do echo -n $x | sha1sum ; done
151 "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8",
152 "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98",
153 "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
154 }
155 for x in u"abc":
156 # Make test files with predictable hashes
157 with open(str(dir2.join(x)), "w", encoding="ascii") as f:
158 f.write(x)
159
160 cwltool(folder, get_data('tests/wf/directory.cwl'), "--dir", str(dir2))
161 check_provenance(folder, directory=True)
162
163 # Output should include ls stdout of filenames a b c on each line
164 file_list = os.path.join(
165 folder, "data",
166 # checksum as returned from:
167 # echo -e "a\nb\nc" | sha1sum
168 # 3ca69e8d6c234a469d16ac28a4a658c92267c423 -
169 "3c",
170 "3ca69e8d6c234a469d16ac28a4a658c92267c423")
171 assert os.path.isfile(file_list)
172
173 # Input files should be captured by hash value,
174 # even if they were inside a class: Directory
175 for (l, l_hash) in sha1.items():
176 prefix = l_hash[:2] # first 2 letters
177 p = os.path.join(folder, "data", prefix, l_hash)
178 assert os.path.isfile(p), "Could not find %s as %s" % (l, p)
179
180 def check_output_object(base_path):
181 output_obj = os.path.join(base_path, "workflow", "primary-output.json")
182 compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284"
183 compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284"
184 with open(output_obj) as fp:
185 out_json = json.load(fp)
186 f1 = out_json["sorted_output"]
187 assert f1["checksum"] == compare_checksum
188 assert f1["location"] == compare_location
189
190
191 def check_secondary_files(base_path):
192 foo_data = os.path.join(
193 base_path, "data",
194 # checksum as returned from:
195 # $ echo -n foo | sha1sum
196 # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 -
197 "0b",
198 "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")
199 bar_data = os.path.join(
200 base_path, "data", "62", "62cdb7020ff920e5aa642c3d4066950dd1f01f4d")
201 assert os.path.isfile(foo_data), "Did not capture file.txt 'foo'"
202 assert os.path.isfile(bar_data), "Did not capture secondary file.txt.idx 'bar"
203
204 primary_job = os.path.join(base_path, "workflow", "primary-job.json")
205 with open(primary_job) as fp:
206 job_json = json.load(fp)
207 # TODO: Verify secondaryFile in primary-job.json
208 f1 = job_json["file1"]
209 assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
210 assert f1["basename"] == "foo1.txt"
211
212 secondaries = f1["secondaryFiles"]
213 assert secondaries
214 f1idx = secondaries[0]
215 assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
216 assert f1idx["basename"], "foo1.txt.idx"
217
218 def check_provenance(base_path, nested=False, single_tool=False, directory=False,
219 secondary_files=False):
220 check_folders(base_path)
221 check_bagit(base_path)
222 check_ro(base_path, nested=nested)
223 check_prov(base_path, nested=nested, single_tool=single_tool, directory=directory,
224 secondary_files=secondary_files)
225
226 def check_folders(base_path):
227 required_folders = [
228 "data", "snapshot", "workflow", "metadata", os.path.join("metadata", "provenance")]
229
230 for folder in required_folders:
231 assert os.path.isdir(os.path.join(base_path, folder))
232
233 def check_bagit(base_path):
234 # check bagit structure
235 required_files = [
236 "bagit.txt", "bag-info.txt", "manifest-sha1.txt",
237 "tagmanifest-sha1.txt", "tagmanifest-sha256.txt"]
238
239 for basename in required_files:
240 file_path = os.path.join(base_path, basename)
241 assert os.path.isfile(file_path)
242
243 bag = bagit.Bag(base_path)
244 assert bag.has_oxum()
245 (only_manifest, only_fs) = bag.compare_manifests_with_fs()
246 assert not list(only_manifest), "Some files only in manifest"
247 assert not list(only_fs), "Some files only on file system"
248 missing_tagfiles = bag.missing_optional_tagfiles()
249 assert not list(missing_tagfiles), "Some files only in tagmanifest"
250 bag.validate()
251 # TODO: Check other bag-info attributes
252 assert arcp.is_arcp_uri(bag.info.get("External-Identifier"))
253
254 def find_arcp(base_path):
255 # First try to find External-Identifier
256 bag = bagit.Bag(base_path)
257 ext_id = bag.info.get("External-Identifier")
258 if arcp.is_arcp_uri(ext_id):
259 return ext_id
260 raise Exception("Can't find External-Identifier")
261
262 def _arcp2file(base_path, uri):
263 parsed = arcp.parse_arcp(uri)
264 # arcp URIs, ensure they are local to our RO
265 assert parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid,\
266 'arcp URI must be local to the research object'
267
268 path = parsed.path[1:] # Strip first /
269 # Convert to local path, in case it uses \ on Windows
270 lpath = str(Path(path))
271 return os.path.join(base_path, lpath)
272
273 def check_ro(base_path, nested=False):
274 manifest_file = os.path.join(base_path, "metadata", "manifest.json")
275 assert os.path.isfile(manifest_file), "Can't find " + manifest_file
276 arcp_root = find_arcp(base_path)
277 base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json")
278 g = Graph()
279
280 # Avoid resolving JSON-LD context https://w3id.org/bundle/context
281 # so this test works offline
282 context = Path(get_data("tests/bundle-context.jsonld")).as_uri()
283 with open(manifest_file, "r", encoding="UTF-8") as f:
284 jsonld = f.read()
285 # replace with file:/// URI
286 jsonld = jsonld.replace("https://w3id.org/bundle/context", context)
287 g.parse(data=jsonld, format="json-ld", publicID=base)
288 if os.environ.get("DEBUG"):
289 print("Parsed manifest:\n\n")
290 g.serialize(sys.stdout, format="ttl")
291 ro = None
292
293 for ro in g.subjects(ORE.isDescribedBy, URIRef(base)):
294 break
295 assert ro is not None, "Can't find RO with ore:isDescribedBy"
296
297 profile = None
298 for dc in g.objects(ro, DCTERMS.conformsTo):
299 profile = dc
300 break
301 assert profile is not None, "Can't find profile with dct:conformsTo"
302 assert profile == URIRef(provenance.CWLPROV_VERSION),\
303 "Unexpected cwlprov version " + profile
304
305 paths = []
306 externals = []
307 for aggregate in g.objects(ro, ORE.aggregates):
308 if not arcp.is_arcp_uri(aggregate):
309 externals.append(aggregate)
310 # Won't check external URIs existence here
311 # TODO: Check they are not relative!
312 continue
313 lfile = _arcp2file(base_path, aggregate)
314 paths.append(os.path.relpath(lfile, base_path))
315 assert os.path.isfile(lfile), "Can't find aggregated " + lfile
316
317 assert paths, "Didn't find any arcp aggregates"
318 assert externals, "Didn't find any data URIs"
319
320 for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]:
321 f = "metadata/provenance/primary.cwlprov.%s" % ext
322 assert f in paths, "provenance file missing " + f
323
324 for f in ["workflow/primary-job.json", "workflow/packed.cwl", "workflow/primary-output.json"]:
325 assert f in paths, "workflow file missing " + f
326 # Can't test snapshot/ files directly as their name varies
327
328 # TODO: check urn:hash::sha1 thingies
329 # TODO: Check OA annotations
330
331 packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl")
332 primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json")
333 primary_prov_nt = urllib.parse.urljoin(arcp_root, "/metadata/provenance/primary.cwlprov.nt")
334 uuid = arcp.parse_arcp(arcp_root).uuid
335
336 highlights = set(g.subjects(OA.motivatedBy, OA.highlighting))
337 assert highlights, "Didn't find highlights"
338 for h in highlights:
339 assert (h, OA.hasTarget, URIRef(packed)) in g
340
341 describes = set(g.subjects(OA.motivatedBy, OA.describing))
342 for d in describes:
343 assert (d, OA.hasBody, URIRef(arcp_root)) in g
344 assert (d, OA.hasTarget, URIRef(uuid.urn)) in g
345
346 linked = set(g.subjects(OA.motivatedBy, OA.linking))
347 for l in linked:
348 assert (l, OA.hasBody, URIRef(packed)) in g
349 assert (l, OA.hasBody, URIRef(primary_job)) in g
350 assert (l, OA.hasTarget, URIRef(uuid.urn)) in g
351
352 has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt)))
353 for p in has_provenance:
354 assert (p, OA.hasTarget, URIRef(uuid.urn)) in g
355 assert (p, OA.motivatedBy, PROV.has_provenance) in g
356 # Check all prov elements are listed
357 formats = set()
358 for prov in g.objects(p, OA.hasBody):
359 assert (prov, DCTERMS.conformsTo, URIRef(provenance.CWLPROV_VERSION)) in g
360 # NOTE: DC.format is a Namespace method and does not resolve like other terms
361 formats.update(set(g.objects(prov, DC["format"])))
362 assert formats, "Could not find media types"
363 expected = set(Literal(f) for f in (
364 "application/json",
365 "application/ld+json",
366 "application/n-triples",
367 'text/provenance-notation; charset="UTF-8"',
368 'text/turtle; charset="UTF-8"',
369 "application/xml"
370 ))
371 assert formats == expected, "Did not match expected PROV media types"
372
373 if nested:
374 # Check for additional PROVs
375 # Let's try to find the other wf run ID
376 otherRuns = set()
377 for p in g.subjects(OA.motivatedBy, PROV.has_provenance):
378 if (p, OA.hasTarget, URIRef(uuid.urn)) in g:
379 continue
380 otherRuns.update(set(g.objects(p, OA.hasTarget)))
381 assert otherRuns, "Could not find nested workflow run prov annotations"
382
383 def check_prov(base_path, nested=False, single_tool=False, directory=False,
384 secondary_files=False):
385 prov_file = os.path.join(base_path, "metadata", "provenance", "primary.cwlprov.nt")
386 assert os.path.isfile(prov_file), "Can't find " + prov_file
387 arcp_root = find_arcp(base_path)
388 # Note: We don't need to include metadata/provnance in base URI
389 # as .nt always use absolute URIs
390 g = Graph()
391 with open(prov_file, "rb") as f:
392 g.parse(file=f, format="nt", publicID=arcp_root)
393 if os.environ.get("DEBUG"):
394 print("Parsed %s:\n\n" % prov_file)
395 g.serialize(sys.stdout, format="ttl")
396 runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun))
397
398 # master workflow run URI (as urn:uuid:) should correspond to arcp uuid part
399 uuid = arcp.parse_arcp(arcp_root).uuid
400 master_run = URIRef(uuid.urn)
401 assert master_run in runs, "Can't find run %s in %s" % (master_run, runs)
402 # TODO: we should not need to parse arcp, but follow
403 # the has_provenance annotations in manifest.json instead
404
405 # run should have been started by a wf engine
406
407 engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine))
408 assert engines, "Could not find WorkflowEngine"
409 assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines
410 engine = engines.pop()
411
412 assert (master_run, PROV.wasAssociatedWith, engine) in g, "Wf run not associated with wf engine"
413 assert (engine, RDF.type, PROV.SoftwareAgent) in g, "Engine not declared as SoftwareAgent"
414
415 if single_tool:
416 activities = set(g.subjects(RDF.type, PROV.Activity))
417 assert len(activities) == 1, "Too many activities: %s" % activities
418 # single tool exec, there should be no other activities
419 # than the tool run
420 # (NOTE: the WorkflowEngine is also activity, but not declared explicitly)
421 else:
422 # Check all process runs were started by the master worklow
423 stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun))
424 # Although semantically a WorkflowEngine is also a ProcessRun,
425 # we don't declare that,
426 # thus only the step activities should be in this set.
427 assert master_run not in stepActivities
428 assert stepActivities, "No steps executed in workflow"
429 for step in stepActivities:
430 # Let's check it was started by the master_run. Unfortunately, unlike PROV-N
431 # in PROV-O RDF we have to check through the n-ary qualifiedStart relation
432 starts = set(g.objects(step, PROV.qualifiedStart))
433 assert starts, "Could not find qualifiedStart of step %s" % step
434 assert len(starts) == 1, "Too many qualifiedStart for step %s" % step
435 start = starts.pop()
436 assert (start, PROV.hadActivity, master_run) in g,\
437 "Step activity not started by master activity"
438 # Tip: Any nested workflow step executions should not be in this prov file,
439 # but in separate file
440 if nested:
441 # Find some cwlprov.nt the nested workflow is described in
442 prov_ids = set(g.objects(predicate=PROV.has_provenance))
443 # FIXME: The above is a bit naive and does not check the subject is
444 # one of the steps -- OK for now as this is the only case of prov:has_provenance
445 assert prov_ids, "Could not find prov:has_provenance from nested workflow"
446
447 nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")]
448 # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename
449 assert nt_uris, "Could not find *.cwlprov.nt"
450 # Load into new graph
451 g2 = Graph()
452 nt_uri = nt_uris.pop()
453 with open(_arcp2file(base_path, nt_uri), "rb") as f:
454 g2.parse(file=f, format="nt", publicID=nt_uri)
455 # TODO: Check g2 statements that it's the same UUID activity inside
456 # as in the outer step
457 if directory:
458 directories = set(g.subjects(RDF.type, RO.Folder))
459 assert directories
460
461 for d in directories:
462 assert (d, RDF.type, PROV.Dictionary) in g
463 assert (d, RDF.type, PROV.Collection) in g
464 assert(d, RDF.type, PROV.Entity) in g
465
466 files = set()
467 for entry in g.objects(d, PROV.hadDictionaryMember):
468 assert (entry, RDF.type, PROV.KeyEntityPair) in g
469 # We don't check what that filename is here
470 assert set(g.objects(entry, PROV.pairKey))
471
472 # RO:Folder aspect
473 assert set(g.objects(entry, RO.entryName))
474 assert (d, ORE.aggregates, entry) in g
475 assert (entry, RDF.type, RO.FolderEntry) in g
476 assert (entry, RDF.type, ORE.Proxy) in g
477 assert (entry, ORE.proxyIn, d) in g
478 assert (entry, ORE.proxyIn, d) in g
479
480 # Which file?
481 entities = set(g.objects(entry, PROV.pairEntity))
482 assert entities
483 f = entities.pop()
484 files.add(f)
485 assert (entry, ORE.proxyFor, f) in g
486 assert (f, RDF.type, PROV.Entity) in g
487
488 if not files:
489 assert (d, RDF.type, PROV.EmptyCollection) in g
490 assert (d, RDF.type, PROV.EmptyDictionary) in g
491 if secondary_files:
492 derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile))
493 assert derivations
494 for der in derivations:
495 sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop()
496 prim = set(g.objects(der, PROV.entity)).pop()
497
498 # UUID specializes a hash checksum
499 assert set(g.objects(sec, PROV.specializationOf))
500 # extensions etc.
501 sec_basename = set(g.objects(sec, CWLPROV.basename)).pop()
502 sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop()
503 sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop()
504 assert str(sec_basename) == "%s%s" % (sec_nameroot, sec_nameext)
505 # TODO: Check hash data file exist in RO
506
507 # The primary entity should have the same, but different values
508 assert set(g.objects(prim, PROV.specializationOf))
509 prim_basename = set(g.objects(prim, CWLPROV.basename)).pop()
510 prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop()
511 prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop()
512 assert str(prim_basename) == "%s%s" % (prim_nameroot, prim_nameext)
513
514
515 @pytest.fixture
516 def research_object():
517 re_ob = provenance.ResearchObject(StdFsAccess(''))
518 yield re_ob
519 re_ob.close()
520
521 def test_absolute_path_fails(research_object):
522 with pytest.raises(ValueError):
523 research_object.write_bag_file("/absolute/path/fails")
524
525 def test_climboutfails(research_object):
526 with pytest.raises(ValueError):
527 research_object.write_bag_file("../../outside-ro")
528
529 def test_writable_string(research_object):
530 with research_object.write_bag_file("file.txt") as file:
531 assert file.writable()
532 file.write(u"Hello\n")
533 # TODO: Check Windows does not modify \n to \r\n here
534
535 sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt")
536 assert os.path.isfile(sha1)
537
538 with open(sha1, "r", encoding="UTF-8") as sha_file:
539 stripped_sha = sha_file.readline().strip()
540 assert stripped_sha.endswith("file.txt")
541 #stain@biggie:~/src/cwltool$ echo Hello | sha1sum
542 #1d229271928d3f9e2bb0375bd6ce5db6c6d348d9 -
543 assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9")
544
545 sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt")
546 assert os.path.isfile(sha256)
547
548 with open(sha256, "r", encoding="UTF-8") as sha_file:
549 stripped_sha = sha_file.readline().strip()
550
551 assert stripped_sha.endswith("file.txt")
552 #stain@biggie:~/src/cwltool$ echo Hello | sha256sum
553 #66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18 -
554 assert stripped_sha.startswith("66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18")
555
556 sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt")
557 assert os.path.isfile(sha512)
558
559 def test_writable_unicode_string(research_object):
560 with research_object.write_bag_file("file.txt") as file:
561 assert file.writable()
562 file.write(u"Here is a snowman: \u2603 \n")
563
564 def test_writable_bytes(research_object):
565 string = u"Here is a snowman: \u2603 \n".encode("UTF-8")
566 with research_object.write_bag_file("file.txt", encoding=None) as file:
567 file.write(string)
568
569 def test_data(research_object):
570 with research_object.write_bag_file("data/file.txt") as file:
571 assert file.writable()
572 file.write(u"Hello\n")
573 # TODO: Check Windows does not modify \n to \r\n here
574
575 # Because this is under data/ it should add to manifest
576 # rather than tagmanifest
577 sha1 = os.path.join(research_object.folder, "manifest-sha1.txt")
578 assert os.path.isfile(sha1)
579 with open(sha1, "r", encoding="UTF-8") as file:
580 stripped_sha = file.readline().strip()
581 assert stripped_sha.endswith("data/file.txt")
582
583 def test_not_seekable(research_object):
584 with research_object.write_bag_file("file.txt") as file:
585 assert not file.seekable()
586 with pytest.raises(IOError):
587 file.seek(0)
588
589 def test_not_readable(research_object):
590 with research_object.write_bag_file("file.txt") as file:
591 assert not file.readable()
592 with pytest.raises(IOError):
593 file.read()
594
595 def test_truncate_fails(research_object):
596 with research_object.write_bag_file("file.txt") as file:
597 file.write(u"Hello there")
598 file.truncate() # OK as we're always at end
599 # Will fail because the checksum can't rewind
600 with pytest.raises(IOError):
601 file.truncate(0)
602
603
604 mod_validness = [
605 # Taken from "Some sample ORCID iDs" on
606 # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
607 ("0000-0002-1825-0097", True),
608 ("0000-0001-5109-3700", True),
609 ("0000-0002-1694-233X", True),
610 # dashes optional
611 ("0000000218250097", True),
612 ("0000000151093700", True),
613 ("000000021694233X", True),
614 # do not fail on missing digits
615 ("0002-1694-233X", True),
616 # Swap check-digits around to force error
617 ("0000-0002-1825-009X", False),
618 ("0000-0001-5109-3707", False),
619 ("0000-0002-1694-2330", False)
620 ]
621
622 @pytest.mark.parametrize('mod11,valid', mod_validness)
623 def test_check_mod_11_2(mod11, valid):
624 assert provenance._check_mod_11_2(mod11) == valid
625
626
627 orcid_uris = [
628 # https://orcid.org/ (Expected form)
629 ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
630 # orcid.org
631 ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
632 # just the number
633 ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"),
634 # lower-case X is OK (and fixed)
635 ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"),
636 # upper-case ORCID.ORG is OK.. (and fixed)
637 ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
638 # Unicode string (Python 2)
639 (u"https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X")
640 ]
641
642 @pytest.mark.parametrize('orcid,expected', orcid_uris)
643 def test_valid_orcid(orcid, expected):
644 assert provenance._valid_orcid(orcid) == expected
645
646
647 invalid_orcids = [
648 # missing digit fails (even if checksum is correct)
649 "0002-1694-2332",
650 # Wrong checkdigit fails
651 "https://orcid.org/0000-0002-1694-2332",
652 "0000-0002-1694-2332",
653 # Missing dashes fails (although that's OK for checksum)
654 "https://orcid.org/000000021694233X",
655 "000000021694233X",
656 # Wrong hostname fails
657 "https://example.org/0000-0002-1694-233X",
658 # Wrong protocol fails
659 "ftp://orcid.org/0000-0002-1694-233X",
660 # Trying to be clever fails (no URL parsing!)
661 "https://orcid.org:443/0000-0002-1694-233X",
662 "http://orcid.org:80/0000-0002-1694-233X",
663 # Empty string is not really valid
664 ""
665 ]
666
667 @pytest.mark.parametrize('orcid', invalid_orcids)
668 def test_invalid_orcid(orcid):
669 with pytest.raises(ValueError):
670 provenance._valid_orcid(orcid)
671
672 def test_whoami():
673 username, fullname = provenance._whoami()
674 assert username and isinstance(username, str)
675 assert fullname and isinstance(fullname, str)
676
677 def test_research_object():
678 # TODO: Test ResearchObject methods
679 pass
680
681 # Reasearch object may need to be pickled (for Toil)
682 def test_research_object_picklability(research_object):
683 assert pickle.dumps(research_object) is not None