Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/prov/tests/test_rdf.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/prov/tests/test_rdf.py Fri Jul 31 00:32:28 2020 -0400 @@ -0,0 +1,246 @@ +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import unittest +from prov.model import ProvDocument +from prov.tests.utility import RoundTripTestCase +from prov.tests.test_model import (TestStatementsBase, + TestAttributesBase, TestQualifiedNamesBase) +import os +from glob import glob +import logging +logger = logging.getLogger(__name__) + +from prov.tests import examples +import prov.model as pm + +import rdflib as rl +from rdflib.compare import graph_diff +from io import BytesIO, StringIO + + +def find_diff(g_rdf, g0_rdf): + graphs_equal = True + in_both, in_first, in_second = graph_diff(g_rdf, g0_rdf) + g1 = sorted(in_first.serialize(format='nt').splitlines())[1:] + g2 = sorted(in_second.serialize(format='nt').splitlines())[1:] + # Compare literals + if len(g1) != len(g2): + graphs_equal = False + matching_indices = [[], []] + for idx in range(len(g1)): + g1_stmt = list(rl.ConjunctiveGraph().parse(BytesIO(g1[idx]), + format='nt'))[0] + match_found = False + for idx2 in range(len(g2)): + if idx2 in matching_indices[1]: + continue + g2_stmt = list(rl.ConjunctiveGraph().parse(BytesIO(g2[idx2]), + format='nt'))[0] + try: + all_match = all([g1_stmt[i].eq(g2_stmt[i]) for i in range(3)]) + except TypeError as e: + #logger.info(e, g1_stmt, g2_stmt) + all_match = False + if all_match: + matching_indices[0].append(idx) + matching_indices[1].append(idx2) + match_found = True + break + if not match_found: + graphs_equal = False + in_first2 = rl.ConjunctiveGraph() + for idx in range(len(g1)): + if idx in matching_indices[0]: + in_both.parse(BytesIO(g1[idx]), format='nt') + else: + in_first2.parse(BytesIO(g1[idx]), format='nt') + in_second2 = rl.ConjunctiveGraph() + for idx in range(len(g2)): + if not idx in matching_indices[1]: + in_second2.parse(BytesIO(g2[idx]), format='nt') + #logger.info(in_first2) + #logger.info(in_second2) + return graphs_equal, in_both, in_first2, in_second2 + + +class TestExamplesBase(object): + """This is the base class for testing support for all the examples provided + in prov.tests.examples. + It is not runnable and needs to be included in a subclass of + RoundTripTestCase. + """ + def test_all_examples(self): + counter = 0 + for name, graph in examples.tests: + if name in ['datatypes']: + logger.info('%d. Skipping the %s example', counter, name) + continue + counter += 1 + logger.info('%d. Testing the %s example', counter, name) + g = graph() + self.do_tests(g) + + +class TestJSONExamplesBase(object): + """This is the base class for testing support for all the examples provided + in prov.tests.examples. + It is not runnable and needs to be included in a subclass of + RoundTripTestCase. + """ + def test_all_examples(self): + counter = 0 + for name, graph in examples.tests: + if name in ['datatypes']: + logger.info('%d. Skipping the %s example', counter, name) + continue + counter += 1 + logger.info('%d. Testing the %s example', counter, name) + g = graph() + self.do_tests(g) + + +class TestStatementsBase2(TestStatementsBase): + @unittest.expectedFailure + def test_scruffy_end_1(self): + TestStatementsBase.test_scruffy_end_1() + @unittest.expectedFailure + def test_scruffy_end_2(self): + TestStatementsBase.test_scruffy_end_2() + @unittest.expectedFailure + def test_scruffy_end_3(self): + TestStatementsBase.test_scruffy_end_3() + @unittest.expectedFailure + def test_scruffy_end_4(self): + TestStatementsBase.test_scruffy_end_4() + @unittest.expectedFailure + def test_scruffy_generation_1(self): + TestStatementsBase.test_scruffy_generation_1() + @unittest.expectedFailure + def test_scruffy_generation_2(self): + TestStatementsBase.test_scruffy_generation_2() + @unittest.expectedFailure + def test_scruffy_invalidation_1(self): + TestStatementsBase.test_scruffy_invalidation_1() + @unittest.expectedFailure + def test_scruffy_invalidation_2(self): + TestStatementsBase.test_scruffy_invalidation_2() + @unittest.expectedFailure + def test_scruffy_start_1(self): + TestStatementsBase.test_scruffy_start_1() + @unittest.expectedFailure + def test_scruffy_start_2(self): + TestStatementsBase.test_scruffy_start_2() + @unittest.expectedFailure + def test_scruffy_start_3(self): + TestStatementsBase.test_scruffy_start_3() + @unittest.expectedFailure + def test_scruffy_start_4(self): + TestStatementsBase.test_scruffy_start_4() + @unittest.expectedFailure + def test_scruffy_usage_1(self): + TestStatementsBase.test_scruffy_usage_1() + @unittest.expectedFailure + def test_scruffy_usage_2(self): + TestStatementsBase.test_scruffy_usage_2() + + +class TestAttributesBase2(TestAttributesBase): + @unittest.expectedFailure + def test_entity_with_multiple_attribute(self): + TestAttributesBase.test_entity_with_multiple_attribute() + @unittest.expectedFailure + def test_entity_with_multiple_value_attribute(self): + TestAttributesBase.test_entity_with_multiple_value_attribute() + @unittest.expectedFailure + def test_entity_with_one_type_attribute_8(self): + TestAttributesBase.test_entity_with_one_type_attribute_8() + + +class AllTestsBase(TestExamplesBase, + TestStatementsBase2, + TestQualifiedNamesBase, + TestAttributesBase2 + ): + """This is a test to include all available tests. + """ + pass + + +class TestRDFSerializer(unittest.TestCase): + def test_decoding_unicode_value(self): + unicode_char = u'\u2019' + rdf_content = u''' +@prefix ex: <http://www.example.org/> . +@prefix prov: <http://www.w3.org/ns/prov#> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix xml: <http://www.w3.org/XML/1998/namespace> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . + + ex:unicode_char a prov:Entity ; + rdfs:label "%s"^^xsd:string . +''' % unicode_char + prov_doc = ProvDocument.deserialize(content=rdf_content, + format='rdf', rdf_format='turtle') + e1 = prov_doc.get_record('ex:unicode_char')[0] + self.assertIn(unicode_char, e1.get_attribute('prov:label')) + + def test_json_to_ttl_match(self): + json_files = sorted( + glob(os.path.join(os.path.dirname(__file__), 'json', '*.json'))) + + # invalid round trip files + skip = list(range(352, 380)) + + # invalid literal set representation e.g., set((1, True)) + skip_match = [5, 6, 7, 8, 15, 27, 28, 29, 75, 76, 77, 78, 79, 80, 260, + 261, 262, 263, 264, + 306, 313, 315, 317, 322, 323, 324, 325, 330, 332, 344, + 346, 382, 389, 395, 397, + ] + errors = [] + for idx, fname in enumerate(json_files): + _, ttl_file = os.path.split(fname) + ttl_file = os.path.join(os.path.dirname(__file__), 'rdf', + ttl_file.replace('json', 'ttl')) + try: + g = pm.ProvDocument.deserialize(fname) + if len(g.bundles) == 0: + format = 'turtle' + else: + format = 'trig' + if format == 'trig': + ttl_file = ttl_file.replace('ttl', 'trig') + + with open(ttl_file, 'rb') as fp: + g_rdf = rl.ConjunctiveGraph().parse(fp, format=format) + g0_rdf = rl.ConjunctiveGraph().parse( + StringIO(g.serialize(format='rdf', rdf_format=format)), + format=format) + if idx not in skip_match: + match, _, in_first, in_second = find_diff(g_rdf, g0_rdf) + self.assertTrue(match) + else: + logger.info('Skipping match: %s' % fname) + if idx in skip: + logger.info('Skipping deserialization: %s' % fname) + continue + g1 = pm.ProvDocument.deserialize( + content=g.serialize(format='rdf', rdf_format=format), + format='rdf', rdf_format=format) + except Exception as e: + #logger.info(e) + errors.append((e, idx, fname, in_first, in_second)) + self.assertFalse(errors) + + +class RoundTripRDFTests(RoundTripTestCase, AllTestsBase): + FORMAT = 'rdf' + +if __name__ == "__main__": + suite = unittest.TestSuite() + for method in dir(TestRDFSerializer): + if method.startswith("test"): + suite.addTest(TestRDFSerializer(method)) + unittest.TextTestRunner().run(suite) \ No newline at end of file