Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/cwltool/tests/test_provenance.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 (2020-07-31) |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/cwltool/tests/test_provenance.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,683 @@ +import json +import ntpath +import os +import posixpath +import shutil +import sys +import tempfile +from io import open +try: + import cPickle as pickle +except ImportError: + import pickle + +from six.moves import urllib + +import arcp +import pytest +from rdflib import Graph, Literal, Namespace, URIRef +from rdflib.namespace import DC, DCTERMS, RDF + +import bagit +# Module to be tested +from cwltool import load_tool, provenance +from cwltool.main import main +from cwltool.resolver import Path +from cwltool.context import RuntimeContext +from cwltool.stdfsaccess import StdFsAccess + +from .util import get_data, needs_docker, temp_dir, working_directory + +# RDF namespaces we'll query for later +ORE = Namespace("http://www.openarchives.org/ore/terms/") +PROV = Namespace("http://www.w3.org/ns/prov#") +RO = Namespace("http://purl.org/wf4ever/ro#") +WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#") +WFPROV = Namespace("http://purl.org/wf4ever/wfprov#") +SCHEMA = Namespace("http://schema.org/") +CWLPROV = Namespace("https://w3id.org/cwl/prov#") +OA = Namespace("http://www.w3.org/ns/oa#") + + +@pytest.fixture +def folder(tmpdir): + directory = str(tmpdir) + if os.environ.get("DEBUG"): + print("%s folder: %s" % (__loader__.fullname, folder)) + yield directory + + if not os.environ.get("DEBUG"): + shutil.rmtree(directory) + + +def cwltool(folder, *args): + new_args = ['--provenance', folder] + new_args.extend(args) + # Run within a temporary directory to not pollute git checkout + with temp_dir("cwltool-run") as tmp_dir: + with working_directory(tmp_dir): + status = main(new_args) + assert status == 0, "Failed: cwltool.main(%r)" % (args) + +@needs_docker +def test_hello_workflow(folder): + cwltool(folder, get_data('tests/wf/hello-workflow.cwl'), "--usermessage", "Hello workflow") + check_provenance(folder) + +@needs_docker +def test_hello_single_tool(folder): + cwltool(folder, get_data('tests/wf/hello_single_tool.cwl'), "--message", "Hello tool") + check_provenance(folder, single_tool=True) + +@needs_docker +def test_revsort_workflow(folder): + cwltool(folder, get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json')) + check_output_object(folder) + check_provenance(folder) + +@needs_docker +def test_nested_workflow(folder): + cwltool(folder, get_data('tests/wf/nested.cwl')) + check_provenance(folder, nested=True) + +@needs_docker +def test_secondary_files_implicit(folder, tmpdir): + file1 = tmpdir.join("foo1.txt") + file1idx = tmpdir.join("foo1.txt.idx") + + with open(str(file1), "w", encoding="ascii") as f: + f.write(u"foo") + with open(str(file1idx), "w", encoding="ascii") as f: + f.write(u"bar") + + # secondary will be picked up by .idx + cwltool(folder, get_data('tests/wf/sec-wf.cwl'), "--file1", str(file1)) + check_provenance(folder, secondary_files=True) + check_secondary_files(folder) + +@needs_docker +def test_secondary_files_explicit(folder, tmpdir): + orig_tempdir = tempfile.tempdir + tempfile.tempdir = str(tmpdir) + # Deliberately do NOT have common basename or extension + file1 = tempfile.mktemp("foo") + file1idx = tempfile.mktemp("bar") + + with open(file1, "w", encoding="ascii") as f: + f.write(u"foo") + with open(file1idx, "w", encoding="ascii") as f: + f.write(u"bar") + + # explicit secondaryFiles + job = {"file1": + {"class": "File", + "path": file1, + "basename": "foo1.txt", + "secondaryFiles": [ + { + "class": "File", + "path": file1idx, + "basename": "foo1.txt.idx", + } + ] + } + } + jobJson = tempfile.mktemp("job.json") + with open(jobJson, "wb") as fp: + j = json.dumps(job, ensure_ascii=True) + fp.write(j.encode("ascii")) + + cwltool(folder, get_data('tests/wf/sec-wf.cwl'), jobJson) + check_provenance(folder, secondary_files=True) + check_secondary_files(folder) + tempfile.tempdir = orig_tempdir + +@needs_docker +def test_secondary_files_output(folder): + # secondary will be picked up by .idx + cwltool(folder, get_data('tests/wf/sec-wf-out.cwl')) + check_provenance(folder, secondary_files=True) + # Skipped, not the same secondary files as above + #self.check_secondary_files() + +@needs_docker +def test_directory_workflow(folder, tmpdir): + dir2 = tmpdir.join("dir2") + os.makedirs(str(dir2)) + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", + "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", + "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + } + for x in u"abc": + # Make test files with predictable hashes + with open(str(dir2.join(x)), "w", encoding="ascii") as f: + f.write(x) + + cwltool(folder, get_data('tests/wf/directory.cwl'), "--dir", str(dir2)) + check_provenance(folder, directory=True) + + # Output should include ls stdout of filenames a b c on each line + file_list = os.path.join( + folder, "data", + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + "3c", + "3ca69e8d6c234a469d16ac28a4a658c92267c423") + assert os.path.isfile(file_list) + + # Input files should be captured by hash value, + # even if they were inside a class: Directory + for (l, l_hash) in sha1.items(): + prefix = l_hash[:2] # first 2 letters + p = os.path.join(folder, "data", prefix, l_hash) + assert os.path.isfile(p), "Could not find %s as %s" % (l, p) + +def check_output_object(base_path): + output_obj = os.path.join(base_path, "workflow", "primary-output.json") + compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284" + compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284" + with open(output_obj) as fp: + out_json = json.load(fp) + f1 = out_json["sorted_output"] + assert f1["checksum"] == compare_checksum + assert f1["location"] == compare_location + + +def check_secondary_files(base_path): + foo_data = os.path.join( + base_path, "data", + # checksum as returned from: + # $ echo -n foo | sha1sum + # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - + "0b", + "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") + bar_data = os.path.join( + base_path, "data", "62", "62cdb7020ff920e5aa642c3d4066950dd1f01f4d") + assert os.path.isfile(foo_data), "Did not capture file.txt 'foo'" + assert os.path.isfile(bar_data), "Did not capture secondary file.txt.idx 'bar" + + primary_job = os.path.join(base_path, "workflow", "primary-job.json") + with open(primary_job) as fp: + job_json = json.load(fp) + # TODO: Verify secondaryFile in primary-job.json + f1 = job_json["file1"] + assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" + assert f1["basename"] == "foo1.txt" + + secondaries = f1["secondaryFiles"] + assert secondaries + f1idx = secondaries[0] + assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d" + assert f1idx["basename"], "foo1.txt.idx" + +def check_provenance(base_path, nested=False, single_tool=False, directory=False, + secondary_files=False): + check_folders(base_path) + check_bagit(base_path) + check_ro(base_path, nested=nested) + check_prov(base_path, nested=nested, single_tool=single_tool, directory=directory, + secondary_files=secondary_files) + +def check_folders(base_path): + required_folders = [ + "data", "snapshot", "workflow", "metadata", os.path.join("metadata", "provenance")] + + for folder in required_folders: + assert os.path.isdir(os.path.join(base_path, folder)) + +def check_bagit(base_path): + # check bagit structure + required_files = [ + "bagit.txt", "bag-info.txt", "manifest-sha1.txt", + "tagmanifest-sha1.txt", "tagmanifest-sha256.txt"] + + for basename in required_files: + file_path = os.path.join(base_path, basename) + assert os.path.isfile(file_path) + + bag = bagit.Bag(base_path) + assert bag.has_oxum() + (only_manifest, only_fs) = bag.compare_manifests_with_fs() + assert not list(only_manifest), "Some files only in manifest" + assert not list(only_fs), "Some files only on file system" + missing_tagfiles = bag.missing_optional_tagfiles() + assert not list(missing_tagfiles), "Some files only in tagmanifest" + bag.validate() + # TODO: Check other bag-info attributes + assert arcp.is_arcp_uri(bag.info.get("External-Identifier")) + +def find_arcp(base_path): + # First try to find External-Identifier + bag = bagit.Bag(base_path) + ext_id = bag.info.get("External-Identifier") + if arcp.is_arcp_uri(ext_id): + return ext_id + raise Exception("Can't find External-Identifier") + +def _arcp2file(base_path, uri): + parsed = arcp.parse_arcp(uri) + # arcp URIs, ensure they are local to our RO + assert parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid,\ + 'arcp URI must be local to the research object' + + path = parsed.path[1:] # Strip first / + # Convert to local path, in case it uses \ on Windows + lpath = str(Path(path)) + return os.path.join(base_path, lpath) + +def check_ro(base_path, nested=False): + manifest_file = os.path.join(base_path, "metadata", "manifest.json") + assert os.path.isfile(manifest_file), "Can't find " + manifest_file + arcp_root = find_arcp(base_path) + base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json") + g = Graph() + + # Avoid resolving JSON-LD context https://w3id.org/bundle/context + # so this test works offline + context = Path(get_data("tests/bundle-context.jsonld")).as_uri() + with open(manifest_file, "r", encoding="UTF-8") as f: + jsonld = f.read() + # replace with file:/// URI + jsonld = jsonld.replace("https://w3id.org/bundle/context", context) + g.parse(data=jsonld, format="json-ld", publicID=base) + if os.environ.get("DEBUG"): + print("Parsed manifest:\n\n") + g.serialize(sys.stdout, format="ttl") + ro = None + + for ro in g.subjects(ORE.isDescribedBy, URIRef(base)): + break + assert ro is not None, "Can't find RO with ore:isDescribedBy" + + profile = None + for dc in g.objects(ro, DCTERMS.conformsTo): + profile = dc + break + assert profile is not None, "Can't find profile with dct:conformsTo" + assert profile == URIRef(provenance.CWLPROV_VERSION),\ + "Unexpected cwlprov version " + profile + + paths = [] + externals = [] + for aggregate in g.objects(ro, ORE.aggregates): + if not arcp.is_arcp_uri(aggregate): + externals.append(aggregate) + # Won't check external URIs existence here + # TODO: Check they are not relative! + continue + lfile = _arcp2file(base_path, aggregate) + paths.append(os.path.relpath(lfile, base_path)) + assert os.path.isfile(lfile), "Can't find aggregated " + lfile + + assert paths, "Didn't find any arcp aggregates" + assert externals, "Didn't find any data URIs" + + for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]: + f = "metadata/provenance/primary.cwlprov.%s" % ext + assert f in paths, "provenance file missing " + f + + for f in ["workflow/primary-job.json", "workflow/packed.cwl", "workflow/primary-output.json"]: + assert f in paths, "workflow file missing " + f + # Can't test snapshot/ files directly as their name varies + + # TODO: check urn:hash::sha1 thingies + # TODO: Check OA annotations + + packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl") + primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json") + primary_prov_nt = urllib.parse.urljoin(arcp_root, "/metadata/provenance/primary.cwlprov.nt") + uuid = arcp.parse_arcp(arcp_root).uuid + + highlights = set(g.subjects(OA.motivatedBy, OA.highlighting)) + assert highlights, "Didn't find highlights" + for h in highlights: + assert (h, OA.hasTarget, URIRef(packed)) in g + + describes = set(g.subjects(OA.motivatedBy, OA.describing)) + for d in describes: + assert (d, OA.hasBody, URIRef(arcp_root)) in g + assert (d, OA.hasTarget, URIRef(uuid.urn)) in g + + linked = set(g.subjects(OA.motivatedBy, OA.linking)) + for l in linked: + assert (l, OA.hasBody, URIRef(packed)) in g + assert (l, OA.hasBody, URIRef(primary_job)) in g + assert (l, OA.hasTarget, URIRef(uuid.urn)) in g + + has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt))) + for p in has_provenance: + assert (p, OA.hasTarget, URIRef(uuid.urn)) in g + assert (p, OA.motivatedBy, PROV.has_provenance) in g + # Check all prov elements are listed + formats = set() + for prov in g.objects(p, OA.hasBody): + assert (prov, DCTERMS.conformsTo, URIRef(provenance.CWLPROV_VERSION)) in g + # NOTE: DC.format is a Namespace method and does not resolve like other terms + formats.update(set(g.objects(prov, DC["format"]))) + assert formats, "Could not find media types" + expected = set(Literal(f) for f in ( + "application/json", + "application/ld+json", + "application/n-triples", + 'text/provenance-notation; charset="UTF-8"', + 'text/turtle; charset="UTF-8"', + "application/xml" + )) + assert formats == expected, "Did not match expected PROV media types" + + if nested: + # Check for additional PROVs + # Let's try to find the other wf run ID + otherRuns = set() + for p in g.subjects(OA.motivatedBy, PROV.has_provenance): + if (p, OA.hasTarget, URIRef(uuid.urn)) in g: + continue + otherRuns.update(set(g.objects(p, OA.hasTarget))) + assert otherRuns, "Could not find nested workflow run prov annotations" + +def check_prov(base_path, nested=False, single_tool=False, directory=False, + secondary_files=False): + prov_file = os.path.join(base_path, "metadata", "provenance", "primary.cwlprov.nt") + assert os.path.isfile(prov_file), "Can't find " + prov_file + arcp_root = find_arcp(base_path) + # Note: We don't need to include metadata/provnance in base URI + # as .nt always use absolute URIs + g = Graph() + with open(prov_file, "rb") as f: + g.parse(file=f, format="nt", publicID=arcp_root) + if os.environ.get("DEBUG"): + print("Parsed %s:\n\n" % prov_file) + g.serialize(sys.stdout, format="ttl") + runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun)) + + # master workflow run URI (as urn:uuid:) should correspond to arcp uuid part + uuid = arcp.parse_arcp(arcp_root).uuid + master_run = URIRef(uuid.urn) + assert master_run in runs, "Can't find run %s in %s" % (master_run, runs) + # TODO: we should not need to parse arcp, but follow + # the has_provenance annotations in manifest.json instead + + # run should have been started by a wf engine + + engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine)) + assert engines, "Could not find WorkflowEngine" + assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines + engine = engines.pop() + + assert (master_run, PROV.wasAssociatedWith, engine) in g, "Wf run not associated with wf engine" + assert (engine, RDF.type, PROV.SoftwareAgent) in g, "Engine not declared as SoftwareAgent" + + if single_tool: + activities = set(g.subjects(RDF.type, PROV.Activity)) + assert len(activities) == 1, "Too many activities: %s" % activities + # single tool exec, there should be no other activities + # than the tool run + # (NOTE: the WorkflowEngine is also activity, but not declared explicitly) + else: + # Check all process runs were started by the master worklow + stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun)) + # Although semantically a WorkflowEngine is also a ProcessRun, + # we don't declare that, + # thus only the step activities should be in this set. + assert master_run not in stepActivities + assert stepActivities, "No steps executed in workflow" + for step in stepActivities: + # Let's check it was started by the master_run. Unfortunately, unlike PROV-N + # in PROV-O RDF we have to check through the n-ary qualifiedStart relation + starts = set(g.objects(step, PROV.qualifiedStart)) + assert starts, "Could not find qualifiedStart of step %s" % step + assert len(starts) == 1, "Too many qualifiedStart for step %s" % step + start = starts.pop() + assert (start, PROV.hadActivity, master_run) in g,\ + "Step activity not started by master activity" + # Tip: Any nested workflow step executions should not be in this prov file, + # but in separate file + if nested: + # Find some cwlprov.nt the nested workflow is described in + prov_ids = set(g.objects(predicate=PROV.has_provenance)) + # FIXME: The above is a bit naive and does not check the subject is + # one of the steps -- OK for now as this is the only case of prov:has_provenance + assert prov_ids, "Could not find prov:has_provenance from nested workflow" + + nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")] + # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename + assert nt_uris, "Could not find *.cwlprov.nt" + # Load into new graph + g2 = Graph() + nt_uri = nt_uris.pop() + with open(_arcp2file(base_path, nt_uri), "rb") as f: + g2.parse(file=f, format="nt", publicID=nt_uri) + # TODO: Check g2 statements that it's the same UUID activity inside + # as in the outer step + if directory: + directories = set(g.subjects(RDF.type, RO.Folder)) + assert directories + + for d in directories: + assert (d, RDF.type, PROV.Dictionary) in g + assert (d, RDF.type, PROV.Collection) in g + assert(d, RDF.type, PROV.Entity) in g + + files = set() + for entry in g.objects(d, PROV.hadDictionaryMember): + assert (entry, RDF.type, PROV.KeyEntityPair) in g + # We don't check what that filename is here + assert set(g.objects(entry, PROV.pairKey)) + + # RO:Folder aspect + assert set(g.objects(entry, RO.entryName)) + assert (d, ORE.aggregates, entry) in g + assert (entry, RDF.type, RO.FolderEntry) in g + assert (entry, RDF.type, ORE.Proxy) in g + assert (entry, ORE.proxyIn, d) in g + assert (entry, ORE.proxyIn, d) in g + + # Which file? + entities = set(g.objects(entry, PROV.pairEntity)) + assert entities + f = entities.pop() + files.add(f) + assert (entry, ORE.proxyFor, f) in g + assert (f, RDF.type, PROV.Entity) in g + + if not files: + assert (d, RDF.type, PROV.EmptyCollection) in g + assert (d, RDF.type, PROV.EmptyDictionary) in g + if secondary_files: + derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile)) + assert derivations + for der in derivations: + sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop() + prim = set(g.objects(der, PROV.entity)).pop() + + # UUID specializes a hash checksum + assert set(g.objects(sec, PROV.specializationOf)) + # extensions etc. + sec_basename = set(g.objects(sec, CWLPROV.basename)).pop() + sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop() + sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop() + assert str(sec_basename) == "%s%s" % (sec_nameroot, sec_nameext) + # TODO: Check hash data file exist in RO + + # The primary entity should have the same, but different values + assert set(g.objects(prim, PROV.specializationOf)) + prim_basename = set(g.objects(prim, CWLPROV.basename)).pop() + prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop() + prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop() + assert str(prim_basename) == "%s%s" % (prim_nameroot, prim_nameext) + + +@pytest.fixture +def research_object(): + re_ob = provenance.ResearchObject(StdFsAccess('')) + yield re_ob + re_ob.close() + +def test_absolute_path_fails(research_object): + with pytest.raises(ValueError): + research_object.write_bag_file("/absolute/path/fails") + +def test_climboutfails(research_object): + with pytest.raises(ValueError): + research_object.write_bag_file("../../outside-ro") + +def test_writable_string(research_object): + with research_object.write_bag_file("file.txt") as file: + assert file.writable() + file.write(u"Hello\n") + # TODO: Check Windows does not modify \n to \r\n here + + sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt") + assert os.path.isfile(sha1) + + with open(sha1, "r", encoding="UTF-8") as sha_file: + stripped_sha = sha_file.readline().strip() + assert stripped_sha.endswith("file.txt") + #stain@biggie:~/src/cwltool$ echo Hello | sha1sum + #1d229271928d3f9e2bb0375bd6ce5db6c6d348d9 - + assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9") + + sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt") + assert os.path.isfile(sha256) + + with open(sha256, "r", encoding="UTF-8") as sha_file: + stripped_sha = sha_file.readline().strip() + + assert stripped_sha.endswith("file.txt") + #stain@biggie:~/src/cwltool$ echo Hello | sha256sum + #66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18 - + assert stripped_sha.startswith("66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18") + + sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt") + assert os.path.isfile(sha512) + +def test_writable_unicode_string(research_object): + with research_object.write_bag_file("file.txt") as file: + assert file.writable() + file.write(u"Here is a snowman: \u2603 \n") + +def test_writable_bytes(research_object): + string = u"Here is a snowman: \u2603 \n".encode("UTF-8") + with research_object.write_bag_file("file.txt", encoding=None) as file: + file.write(string) + +def test_data(research_object): + with research_object.write_bag_file("data/file.txt") as file: + assert file.writable() + file.write(u"Hello\n") + # TODO: Check Windows does not modify \n to \r\n here + + # Because this is under data/ it should add to manifest + # rather than tagmanifest + sha1 = os.path.join(research_object.folder, "manifest-sha1.txt") + assert os.path.isfile(sha1) + with open(sha1, "r", encoding="UTF-8") as file: + stripped_sha = file.readline().strip() + assert stripped_sha.endswith("data/file.txt") + +def test_not_seekable(research_object): + with research_object.write_bag_file("file.txt") as file: + assert not file.seekable() + with pytest.raises(IOError): + file.seek(0) + +def test_not_readable(research_object): + with research_object.write_bag_file("file.txt") as file: + assert not file.readable() + with pytest.raises(IOError): + file.read() + +def test_truncate_fails(research_object): + with research_object.write_bag_file("file.txt") as file: + file.write(u"Hello there") + file.truncate() # OK as we're always at end + # Will fail because the checksum can't rewind + with pytest.raises(IOError): + file.truncate(0) + + +mod_validness = [ + # Taken from "Some sample ORCID iDs" on + # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier + ("0000-0002-1825-0097", True), + ("0000-0001-5109-3700", True), + ("0000-0002-1694-233X", True), + # dashes optional + ("0000000218250097", True), + ("0000000151093700", True), + ("000000021694233X", True), + # do not fail on missing digits + ("0002-1694-233X", True), + # Swap check-digits around to force error + ("0000-0002-1825-009X", False), + ("0000-0001-5109-3707", False), + ("0000-0002-1694-2330", False) +] + +@pytest.mark.parametrize('mod11,valid', mod_validness) +def test_check_mod_11_2(mod11, valid): + assert provenance._check_mod_11_2(mod11) == valid + + +orcid_uris = [ + # https://orcid.org/ (Expected form) + ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), + # orcid.org + ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), + # just the number + ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"), + # lower-case X is OK (and fixed) + ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"), + # upper-case ORCID.ORG is OK.. (and fixed) + ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), + # Unicode string (Python 2) + (u"https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X") +] + +@pytest.mark.parametrize('orcid,expected', orcid_uris) +def test_valid_orcid(orcid, expected): + assert provenance._valid_orcid(orcid) == expected + + +invalid_orcids = [ + # missing digit fails (even if checksum is correct) + "0002-1694-2332", + # Wrong checkdigit fails + "https://orcid.org/0000-0002-1694-2332", + "0000-0002-1694-2332", + # Missing dashes fails (although that's OK for checksum) + "https://orcid.org/000000021694233X", + "000000021694233X", + # Wrong hostname fails + "https://example.org/0000-0002-1694-233X", + # Wrong protocol fails + "ftp://orcid.org/0000-0002-1694-233X", + # Trying to be clever fails (no URL parsing!) + "https://orcid.org:443/0000-0002-1694-233X", + "http://orcid.org:80/0000-0002-1694-233X", + # Empty string is not really valid + "" +] + +@pytest.mark.parametrize('orcid', invalid_orcids) +def test_invalid_orcid(orcid): + with pytest.raises(ValueError): + provenance._valid_orcid(orcid) + +def test_whoami(): + username, fullname = provenance._whoami() + assert username and isinstance(username, str) + assert fullname and isinstance(fullname, str) + +def test_research_object(): + # TODO: Test ResearchObject methods + pass + +# Reasearch object may need to be pickled (for Toil) +def test_research_object_picklability(research_object): + assert pickle.dumps(research_object) is not None