view planemo/lib/python3.7/site-packages/cwltool/tests/test_provenance.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line source

import json
import ntpath
import os
import posixpath
import shutil
import sys
import tempfile
from io import open
try:
    import cPickle as pickle
except ImportError:
    import pickle

from six.moves import urllib

import arcp
import pytest
from rdflib import Graph, Literal, Namespace, URIRef
from rdflib.namespace import DC, DCTERMS, RDF

import bagit
# Module to be tested
from cwltool import load_tool, provenance
from cwltool.main import main
from cwltool.resolver import Path
from cwltool.context import RuntimeContext
from cwltool.stdfsaccess import StdFsAccess

from .util import get_data, needs_docker, temp_dir, working_directory

# RDF namespaces we'll query for later
ORE = Namespace("http://www.openarchives.org/ore/terms/")
PROV = Namespace("http://www.w3.org/ns/prov#")
RO = Namespace("http://purl.org/wf4ever/ro#")
WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#")
WFPROV = Namespace("http://purl.org/wf4ever/wfprov#")
SCHEMA = Namespace("http://schema.org/")
CWLPROV = Namespace("https://w3id.org/cwl/prov#")
OA = Namespace("http://www.w3.org/ns/oa#")


@pytest.fixture
def folder(tmpdir):
    directory = str(tmpdir)
    if os.environ.get("DEBUG"):
        print("%s folder: %s" % (__loader__.fullname, folder))
    yield directory

    if not os.environ.get("DEBUG"):
        shutil.rmtree(directory)


def cwltool(folder, *args):
    new_args = ['--provenance', folder]
    new_args.extend(args)
    # Run within a temporary directory to not pollute git checkout
    with temp_dir("cwltool-run") as tmp_dir:
        with working_directory(tmp_dir):
            status = main(new_args)
            assert status == 0, "Failed: cwltool.main(%r)" % (args)

@needs_docker
def test_hello_workflow(folder):
    cwltool(folder, get_data('tests/wf/hello-workflow.cwl'), "--usermessage", "Hello workflow")
    check_provenance(folder)

@needs_docker
def test_hello_single_tool(folder):
    cwltool(folder, get_data('tests/wf/hello_single_tool.cwl'), "--message", "Hello tool")
    check_provenance(folder, single_tool=True)

@needs_docker
def test_revsort_workflow(folder):
    cwltool(folder, get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json'))
    check_output_object(folder)
    check_provenance(folder)

@needs_docker
def test_nested_workflow(folder):
    cwltool(folder, get_data('tests/wf/nested.cwl'))
    check_provenance(folder, nested=True)

@needs_docker
def test_secondary_files_implicit(folder, tmpdir):
    file1 = tmpdir.join("foo1.txt")
    file1idx = tmpdir.join("foo1.txt.idx")

    with open(str(file1), "w", encoding="ascii") as f:
        f.write(u"foo")
    with open(str(file1idx), "w", encoding="ascii") as f:
        f.write(u"bar")

    # secondary will be picked up by .idx
    cwltool(folder, get_data('tests/wf/sec-wf.cwl'), "--file1", str(file1))
    check_provenance(folder, secondary_files=True)
    check_secondary_files(folder)

@needs_docker
def test_secondary_files_explicit(folder, tmpdir):
    orig_tempdir = tempfile.tempdir
    tempfile.tempdir = str(tmpdir)
    # Deliberately do NOT have common basename or extension
    file1 = tempfile.mktemp("foo")
    file1idx = tempfile.mktemp("bar")

    with open(file1, "w", encoding="ascii") as f:
        f.write(u"foo")
    with open(file1idx, "w", encoding="ascii") as f:
        f.write(u"bar")

    # explicit secondaryFiles
    job = {"file1":
           {"class": "File",
            "path": file1,
            "basename": "foo1.txt",
            "secondaryFiles": [
                {
                    "class": "File",
                    "path": file1idx,
                    "basename": "foo1.txt.idx",
                }
            ]
            }
           }
    jobJson = tempfile.mktemp("job.json")
    with open(jobJson, "wb") as fp:
        j = json.dumps(job, ensure_ascii=True)
        fp.write(j.encode("ascii"))

    cwltool(folder, get_data('tests/wf/sec-wf.cwl'), jobJson)
    check_provenance(folder, secondary_files=True)
    check_secondary_files(folder)
    tempfile.tempdir = orig_tempdir

@needs_docker
def test_secondary_files_output(folder):
    # secondary will be picked up by .idx
    cwltool(folder, get_data('tests/wf/sec-wf-out.cwl'))
    check_provenance(folder, secondary_files=True)
    # Skipped, not the same secondary files as above
    #self.check_secondary_files()

@needs_docker
def test_directory_workflow(folder, tmpdir):
    dir2 = tmpdir.join("dir2")
    os.makedirs(str(dir2))
    sha1 = {
        # Expected hashes of ASCII letters (no linefeed)
        # as returned from:
        # for x in a b c ; do echo -n $x | sha1sum ; done
        "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8",
        "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98",
        "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
    }
    for x in u"abc":
        # Make test files with predictable hashes
        with open(str(dir2.join(x)), "w", encoding="ascii") as f:
            f.write(x)

    cwltool(folder, get_data('tests/wf/directory.cwl'), "--dir", str(dir2))
    check_provenance(folder, directory=True)

    # Output should include ls stdout of filenames a b c on each line
    file_list = os.path.join(
        folder, "data",
        # checksum as returned from:
        # echo -e "a\nb\nc" | sha1sum
        # 3ca69e8d6c234a469d16ac28a4a658c92267c423  -
        "3c",
        "3ca69e8d6c234a469d16ac28a4a658c92267c423")
    assert os.path.isfile(file_list)

    # Input files should be captured by hash value,
    # even if they were inside a class: Directory
    for (l, l_hash) in sha1.items():
        prefix = l_hash[:2]  # first 2 letters
        p = os.path.join(folder, "data", prefix, l_hash)
        assert os.path.isfile(p), "Could not find %s as %s" % (l, p)

def check_output_object(base_path):
    output_obj = os.path.join(base_path, "workflow", "primary-output.json")
    compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284"
    compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284"
    with open(output_obj) as fp:
        out_json = json.load(fp)
    f1 = out_json["sorted_output"]
    assert f1["checksum"] == compare_checksum
    assert f1["location"] == compare_location


def check_secondary_files(base_path):
    foo_data = os.path.join(
        base_path, "data",
        # checksum as returned from:
        # $ echo -n foo | sha1sum
        # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33  -
        "0b",
        "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")
    bar_data = os.path.join(
        base_path, "data", "62", "62cdb7020ff920e5aa642c3d4066950dd1f01f4d")
    assert os.path.isfile(foo_data), "Did not capture file.txt 'foo'"
    assert os.path.isfile(bar_data), "Did not capture secondary file.txt.idx 'bar"

    primary_job = os.path.join(base_path, "workflow", "primary-job.json")
    with open(primary_job) as fp:
        job_json = json.load(fp)
    # TODO: Verify secondaryFile in primary-job.json
    f1 = job_json["file1"]
    assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
    assert f1["basename"] == "foo1.txt"

    secondaries = f1["secondaryFiles"]
    assert secondaries
    f1idx = secondaries[0]
    assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
    assert f1idx["basename"], "foo1.txt.idx"

def check_provenance(base_path, nested=False, single_tool=False, directory=False,
                     secondary_files=False):
    check_folders(base_path)
    check_bagit(base_path)
    check_ro(base_path, nested=nested)
    check_prov(base_path, nested=nested, single_tool=single_tool, directory=directory,
               secondary_files=secondary_files)

def check_folders(base_path):
    required_folders = [
        "data", "snapshot", "workflow", "metadata", os.path.join("metadata", "provenance")]

    for folder in required_folders:
        assert os.path.isdir(os.path.join(base_path, folder))

def check_bagit(base_path):
    # check bagit structure
    required_files = [
        "bagit.txt", "bag-info.txt", "manifest-sha1.txt",
        "tagmanifest-sha1.txt", "tagmanifest-sha256.txt"]

    for basename in required_files:
        file_path = os.path.join(base_path, basename)
        assert os.path.isfile(file_path)

    bag = bagit.Bag(base_path)
    assert bag.has_oxum()
    (only_manifest, only_fs) = bag.compare_manifests_with_fs()
    assert not list(only_manifest), "Some files only in manifest"
    assert not list(only_fs), "Some files only on file system"
    missing_tagfiles = bag.missing_optional_tagfiles()
    assert not list(missing_tagfiles), "Some files only in tagmanifest"
    bag.validate()
    # TODO: Check other bag-info attributes
    assert arcp.is_arcp_uri(bag.info.get("External-Identifier"))

def find_arcp(base_path):
    # First try to find External-Identifier
    bag = bagit.Bag(base_path)
    ext_id = bag.info.get("External-Identifier")
    if arcp.is_arcp_uri(ext_id):
        return ext_id
    raise Exception("Can't find External-Identifier")

def _arcp2file(base_path, uri):
    parsed = arcp.parse_arcp(uri)
    # arcp URIs, ensure they are local to our RO
    assert parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid,\
    'arcp URI must be local to the research object'

    path = parsed.path[1:]  # Strip first /
    # Convert to local path, in case it uses \ on Windows
    lpath = str(Path(path))
    return os.path.join(base_path, lpath)

def check_ro(base_path, nested=False):
    manifest_file = os.path.join(base_path, "metadata", "manifest.json")
    assert os.path.isfile(manifest_file), "Can't find " + manifest_file
    arcp_root = find_arcp(base_path)
    base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json")
    g = Graph()

    # Avoid resolving JSON-LD context https://w3id.org/bundle/context
    # so this test works offline
    context = Path(get_data("tests/bundle-context.jsonld")).as_uri()
    with open(manifest_file, "r", encoding="UTF-8") as f:
        jsonld = f.read()
        # replace with file:/// URI
        jsonld = jsonld.replace("https://w3id.org/bundle/context", context)
    g.parse(data=jsonld, format="json-ld", publicID=base)
    if os.environ.get("DEBUG"):
        print("Parsed manifest:\n\n")
        g.serialize(sys.stdout, format="ttl")
    ro = None

    for ro in g.subjects(ORE.isDescribedBy, URIRef(base)):
        break
    assert ro is not None, "Can't find RO with ore:isDescribedBy"

    profile = None
    for dc in g.objects(ro, DCTERMS.conformsTo):
        profile = dc
        break
    assert profile is not None, "Can't find profile with dct:conformsTo"
    assert profile == URIRef(provenance.CWLPROV_VERSION),\
        "Unexpected cwlprov version " + profile

    paths = []
    externals = []
    for aggregate in g.objects(ro, ORE.aggregates):
        if not arcp.is_arcp_uri(aggregate):
            externals.append(aggregate)
            # Won't check external URIs existence here
            # TODO: Check they are not relative!
            continue
        lfile = _arcp2file(base_path, aggregate)
        paths.append(os.path.relpath(lfile, base_path))
        assert os.path.isfile(lfile), "Can't find aggregated " + lfile

    assert paths, "Didn't find any arcp aggregates"
    assert externals, "Didn't find any data URIs"

    for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]:
        f = "metadata/provenance/primary.cwlprov.%s" % ext
        assert f in paths, "provenance file missing " + f

    for f in ["workflow/primary-job.json", "workflow/packed.cwl", "workflow/primary-output.json"]:
        assert f in paths, "workflow file missing " + f
    # Can't test snapshot/ files directly as their name varies

    # TODO: check urn:hash::sha1 thingies
    # TODO: Check OA annotations

    packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl")
    primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json")
    primary_prov_nt = urllib.parse.urljoin(arcp_root, "/metadata/provenance/primary.cwlprov.nt")
    uuid = arcp.parse_arcp(arcp_root).uuid

    highlights = set(g.subjects(OA.motivatedBy, OA.highlighting))
    assert highlights, "Didn't find highlights"
    for h in highlights:
        assert (h, OA.hasTarget, URIRef(packed)) in g

    describes = set(g.subjects(OA.motivatedBy, OA.describing))
    for d in describes:
        assert (d, OA.hasBody, URIRef(arcp_root)) in g
        assert (d, OA.hasTarget, URIRef(uuid.urn)) in g

    linked = set(g.subjects(OA.motivatedBy, OA.linking))
    for l in linked:
        assert (l, OA.hasBody, URIRef(packed)) in g
        assert (l, OA.hasBody, URIRef(primary_job)) in g
        assert (l, OA.hasTarget, URIRef(uuid.urn)) in g

    has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt)))
    for p in has_provenance:
        assert (p, OA.hasTarget, URIRef(uuid.urn)) in g
        assert (p, OA.motivatedBy, PROV.has_provenance) in g
        # Check all prov elements are listed
        formats = set()
        for prov in g.objects(p, OA.hasBody):
            assert (prov, DCTERMS.conformsTo, URIRef(provenance.CWLPROV_VERSION)) in g
            # NOTE: DC.format is a Namespace method and does not resolve like other terms
            formats.update(set(g.objects(prov, DC["format"])))
        assert formats, "Could not find media types"
        expected = set(Literal(f) for f in (
            "application/json",
            "application/ld+json",
            "application/n-triples",
            'text/provenance-notation; charset="UTF-8"',
            'text/turtle; charset="UTF-8"',
            "application/xml"
        ))
        assert formats == expected, "Did not match expected PROV media types"

    if nested:
        # Check for additional PROVs
        # Let's try to find the other wf run ID
        otherRuns = set()
        for p in g.subjects(OA.motivatedBy, PROV.has_provenance):
            if (p, OA.hasTarget, URIRef(uuid.urn)) in g:
                continue
            otherRuns.update(set(g.objects(p, OA.hasTarget)))
        assert otherRuns, "Could not find nested workflow run prov annotations"

def check_prov(base_path, nested=False, single_tool=False, directory=False,
               secondary_files=False):
    prov_file = os.path.join(base_path, "metadata", "provenance", "primary.cwlprov.nt")
    assert os.path.isfile(prov_file), "Can't find " + prov_file
    arcp_root = find_arcp(base_path)
    # Note: We don't need to include metadata/provnance in base URI
    # as .nt always use absolute URIs
    g = Graph()
    with open(prov_file, "rb") as f:
        g.parse(file=f, format="nt", publicID=arcp_root)
    if os.environ.get("DEBUG"):
        print("Parsed %s:\n\n" % prov_file)
        g.serialize(sys.stdout, format="ttl")
    runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun))

    # master workflow run URI (as urn:uuid:) should correspond to arcp uuid part
    uuid = arcp.parse_arcp(arcp_root).uuid
    master_run = URIRef(uuid.urn)
    assert master_run in runs, "Can't find run %s in %s" % (master_run, runs)
    # TODO: we should not need to parse arcp, but follow
    # the has_provenance annotations in manifest.json instead

    # run should have been started by a wf engine

    engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine))
    assert engines, "Could not find WorkflowEngine"
    assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines
    engine = engines.pop()

    assert (master_run, PROV.wasAssociatedWith, engine) in g, "Wf run not associated with wf engine"
    assert (engine, RDF.type, PROV.SoftwareAgent) in g, "Engine not declared as SoftwareAgent"

    if single_tool:
        activities = set(g.subjects(RDF.type, PROV.Activity))
        assert len(activities) == 1, "Too many activities: %s" % activities
        # single tool exec, there should be no other activities
        # than the tool run
        # (NOTE: the WorkflowEngine is also activity, but not declared explicitly)
    else:
        # Check all process runs were started by the master worklow
        stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun))
        # Although semantically a WorkflowEngine is also a ProcessRun,
        # we don't declare that,
        # thus only the step activities should be in this set.
        assert master_run not in stepActivities
        assert stepActivities, "No steps executed in workflow"
        for step in stepActivities:
            # Let's check it was started by the master_run. Unfortunately, unlike PROV-N
            # in PROV-O RDF we have to check through the n-ary qualifiedStart relation
            starts = set(g.objects(step, PROV.qualifiedStart))
            assert starts, "Could not find qualifiedStart of step %s" % step
            assert len(starts) == 1, "Too many qualifiedStart for step %s" % step
            start = starts.pop()
            assert (start, PROV.hadActivity, master_run) in g,\
                "Step activity not started by master activity"
            # Tip: Any nested workflow step executions should not be in this prov file,
            # but in separate file
    if nested:
        # Find some cwlprov.nt the nested workflow is described in
        prov_ids = set(g.objects(predicate=PROV.has_provenance))
        # FIXME: The above is a bit naive and does not check the subject is
        # one of the steps -- OK for now as this is the only case of prov:has_provenance
        assert prov_ids, "Could not find prov:has_provenance from nested workflow"

        nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")]
        # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename
        assert nt_uris, "Could not find *.cwlprov.nt"
        # Load into new graph
        g2 = Graph()
        nt_uri = nt_uris.pop()
        with open(_arcp2file(base_path, nt_uri), "rb") as f:
            g2.parse(file=f, format="nt", publicID=nt_uri)
        # TODO: Check g2 statements that it's the same UUID activity inside
        # as in the outer step
    if directory:
        directories = set(g.subjects(RDF.type, RO.Folder))
        assert directories

        for d in directories:
            assert (d, RDF.type, PROV.Dictionary) in g
            assert (d, RDF.type, PROV.Collection) in g
            assert(d, RDF.type, PROV.Entity) in g

            files = set()
            for entry in g.objects(d, PROV.hadDictionaryMember):
                assert (entry, RDF.type, PROV.KeyEntityPair) in g
                # We don't check what that filename is here
                assert set(g.objects(entry, PROV.pairKey))

                # RO:Folder aspect
                assert set(g.objects(entry, RO.entryName))
                assert (d, ORE.aggregates, entry) in g
                assert (entry, RDF.type, RO.FolderEntry) in g
                assert (entry, RDF.type, ORE.Proxy) in g
                assert (entry, ORE.proxyIn, d) in g
                assert (entry, ORE.proxyIn, d) in g

                # Which file?
                entities = set(g.objects(entry, PROV.pairEntity))
                assert entities
                f = entities.pop()
                files.add(f)
                assert (entry, ORE.proxyFor, f) in g
                assert (f, RDF.type, PROV.Entity) in g

            if not files:
                assert (d, RDF.type, PROV.EmptyCollection) in g
                assert (d, RDF.type, PROV.EmptyDictionary) in g
    if secondary_files:
        derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile))
        assert derivations
        for der in derivations:
            sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop()
            prim = set(g.objects(der, PROV.entity)).pop()

            # UUID specializes a hash checksum
            assert set(g.objects(sec, PROV.specializationOf))
            # extensions etc.
            sec_basename = set(g.objects(sec, CWLPROV.basename)).pop()
            sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop()
            sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop()
            assert str(sec_basename) == "%s%s" % (sec_nameroot, sec_nameext)
            # TODO: Check hash data file exist in RO

            # The primary entity should have the same, but different values
            assert set(g.objects(prim, PROV.specializationOf))
            prim_basename = set(g.objects(prim, CWLPROV.basename)).pop()
            prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop()
            prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop()
            assert str(prim_basename) == "%s%s" % (prim_nameroot, prim_nameext)


@pytest.fixture
def research_object():
    re_ob = provenance.ResearchObject(StdFsAccess(''))
    yield re_ob
    re_ob.close()

def test_absolute_path_fails(research_object):
    with pytest.raises(ValueError):
        research_object.write_bag_file("/absolute/path/fails")

def test_climboutfails(research_object):
    with pytest.raises(ValueError):
        research_object.write_bag_file("../../outside-ro")

def test_writable_string(research_object):
    with research_object.write_bag_file("file.txt") as file:
        assert file.writable()
        file.write(u"Hello\n")
        # TODO: Check Windows does not modify \n to \r\n here

    sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt")
    assert os.path.isfile(sha1)

    with open(sha1, "r", encoding="UTF-8") as sha_file:
        stripped_sha = sha_file.readline().strip()
    assert stripped_sha.endswith("file.txt")
    #stain@biggie:~/src/cwltool$ echo Hello | sha1sum
    #1d229271928d3f9e2bb0375bd6ce5db6c6d348d9  -
    assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9")

    sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt")
    assert os.path.isfile(sha256)

    with open(sha256, "r", encoding="UTF-8") as sha_file:
        stripped_sha = sha_file.readline().strip()

    assert stripped_sha.endswith("file.txt")
    #stain@biggie:~/src/cwltool$ echo Hello | sha256sum
    #66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18  -
    assert stripped_sha.startswith("66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18")

    sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt")
    assert os.path.isfile(sha512)

def test_writable_unicode_string(research_object):
    with research_object.write_bag_file("file.txt") as file:
        assert file.writable()
        file.write(u"Here is a snowman: \u2603 \n")

def test_writable_bytes(research_object):
    string = u"Here is a snowman: \u2603 \n".encode("UTF-8")
    with research_object.write_bag_file("file.txt", encoding=None) as file:
        file.write(string)

def test_data(research_object):
    with research_object.write_bag_file("data/file.txt") as file:
        assert file.writable()
        file.write(u"Hello\n")
    # TODO: Check Windows does not modify \n to \r\n here

    # Because this is under data/ it should add to manifest
    # rather than tagmanifest
    sha1 = os.path.join(research_object.folder, "manifest-sha1.txt")
    assert os.path.isfile(sha1)
    with open(sha1, "r", encoding="UTF-8") as file:
        stripped_sha = file.readline().strip()
        assert stripped_sha.endswith("data/file.txt")

def test_not_seekable(research_object):
    with research_object.write_bag_file("file.txt") as file:
        assert not file.seekable()
        with pytest.raises(IOError):
            file.seek(0)

def test_not_readable(research_object):
    with research_object.write_bag_file("file.txt") as file:
        assert not file.readable()
        with pytest.raises(IOError):
            file.read()

def test_truncate_fails(research_object):
    with research_object.write_bag_file("file.txt") as file:
        file.write(u"Hello there")
        file.truncate()  # OK as we're always at end
        # Will fail because the checksum can't rewind
        with pytest.raises(IOError):
            file.truncate(0)


mod_validness = [
    # Taken from "Some sample ORCID iDs" on
    # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
    ("0000-0002-1825-0097", True),
    ("0000-0001-5109-3700", True),
    ("0000-0002-1694-233X", True),
    # dashes optional
    ("0000000218250097", True),
    ("0000000151093700", True),
    ("000000021694233X", True),
    # do not fail on missing digits
    ("0002-1694-233X", True),
    # Swap check-digits around to force error
    ("0000-0002-1825-009X", False),
    ("0000-0001-5109-3707", False),
    ("0000-0002-1694-2330", False)
]

@pytest.mark.parametrize('mod11,valid', mod_validness)
def test_check_mod_11_2(mod11, valid):
    assert provenance._check_mod_11_2(mod11) == valid


orcid_uris = [
    # https://orcid.org/ (Expected form)
    ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
    # orcid.org
    ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
    # just the number
    ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"),
    # lower-case X is OK (and fixed)
    ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"),
    # upper-case ORCID.ORG is OK.. (and fixed)
    ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
    # Unicode string (Python 2)
    (u"https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X")
]

@pytest.mark.parametrize('orcid,expected', orcid_uris)
def test_valid_orcid(orcid, expected):
    assert provenance._valid_orcid(orcid) == expected


invalid_orcids = [
    # missing digit fails (even if checksum is correct)
    "0002-1694-2332",
    # Wrong checkdigit fails
    "https://orcid.org/0000-0002-1694-2332",
    "0000-0002-1694-2332",
    # Missing dashes fails (although that's OK for checksum)
    "https://orcid.org/000000021694233X",
    "000000021694233X",
    # Wrong hostname fails
    "https://example.org/0000-0002-1694-233X",
    # Wrong protocol fails
    "ftp://orcid.org/0000-0002-1694-233X",
    # Trying to be clever fails (no URL parsing!)
    "https://orcid.org:443/0000-0002-1694-233X",
    "http://orcid.org:80/0000-0002-1694-233X",
    # Empty string is not really valid
    ""
]

@pytest.mark.parametrize('orcid', invalid_orcids)
def test_invalid_orcid(orcid):
    with pytest.raises(ValueError):
        provenance._valid_orcid(orcid)

def test_whoami():
    username, fullname = provenance._whoami()
    assert username and isinstance(username, str)
    assert fullname and isinstance(fullname, str)

def test_research_object():
    # TODO: Test ResearchObject methods
    pass

# Reasearch object may need to be pickled (for Toil)
def test_research_object_picklability(research_object):
    assert pickle.dumps(research_object) is not None