Skip to content

Commit

Permalink
Merge pull request #121 from casework/add_case_validate_get_invalid_c…
Browse files Browse the repository at this point in the history
…do_concepts

case_validate: Expose CDO IRI typo-checker as function
  • Loading branch information
kchason authored Aug 22, 2023
2 parents e51dc87 + 1c636e0 commit 39f8f11
Showing 1 changed file with 100 additions and 60 deletions.
160 changes: 100 additions & 60 deletions case_utils/case_validate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,104 @@ def concept_is_cdo_concept(n_concept: rdflib.URIRef) -> bool:
) or concept_iri.startswith("https://ontology.caseontology.org/")


def get_invalid_cdo_concepts(
data_graph: rdflib.Graph, ontology_graph: rdflib.Graph
) -> Set[rdflib.URIRef]:
"""
Get the set of concepts in the data graph that are not part of the CDO ontologies as specified with the ontology_graph argument.
:param data_graph: The data graph to validate.
:param ontology_graph: The ontology graph to use for validation.
:return: The list of concepts in the data graph that are not part of the CDO ontology.
>>> from case_utils.namespace import NS_RDF, NS_OWL, NS_UCO_CORE
>>> from rdflib import Graph, Literal, Namespace, URIRef
>>> # Define a namespace for a knowledge base, and a namespace for custom extensions.
>>> ns_kb = Namespace("http://example.org/kb/")
>>> ns_ex = Namespace("http://example.org/ontology/")
>>> dg = Graph()
>>> og = Graph()
>>> # Use an ontology graph in review that includes only a single class and a single property excerpted from UCO, but also a single custom property.
>>> _ = og.add((NS_UCO_CORE.UcoObject, NS_RDF.type, NS_OWL.Class))
>>> _ = og.add((NS_UCO_CORE.name, NS_RDF.type, NS_OWL.DatatypeProperty))
>>> _ = og.add((ns_ex.ourCustomProperty, NS_RDF.type, NS_OWL.DatatypeProperty))
>>> # Define an individual.
>>> n_uco_object = ns_kb["UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c"]
>>> n_uco_object
rdflib.term.URIRef('http://example.org/kb/UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c')
>>> # Review a data graph that includes only the single individual, class typo'd (capitalized incorrectly), but property OK.
>>> _ = dg.add((n_uco_object, NS_RDF.type, NS_UCO_CORE.UCOObject))
>>> _ = dg.add((n_uco_object, NS_UCO_CORE.name, Literal("Test")))
>>> _ = dg.add((n_uco_object, ns_ex.customProperty, Literal("Custom Value")))
>>> invalid_cdo_concepts = get_invalid_cdo_concepts(dg, og)
>>> invalid_cdo_concepts
{rdflib.term.URIRef('https://ontology.unifiedcyberontology.org/uco/core/UCOObject')}
>>> # Note that the property "ourCustomProperty" was typo'd in the data graph, but this was not reported.
>>> assert ns_ex.ourCustomProperty not in invalid_cdo_concepts
"""
# Construct set of CDO concepts for data graph concept-existence review.
cdo_concepts: Set[rdflib.URIRef] = set()

for n_structural_class in [
NS_OWL.Class,
NS_OWL.AnnotationProperty,
NS_OWL.DatatypeProperty,
NS_OWL.ObjectProperty,
NS_RDFS.Datatype,
NS_SH.NodeShape,
NS_SH.PropertyShape,
NS_SH.Shape,
]:
for ontology_triple in ontology_graph.triples(
(None, NS_RDF.type, n_structural_class)
):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
if concept_is_cdo_concept(ontology_triple[0]):
cdo_concepts.add(ontology_triple[0])
for n_ontology_predicate in [
NS_OWL.backwardCompatibleWith,
NS_OWL.imports,
NS_OWL.incompatibleWith,
NS_OWL.priorVersion,
NS_OWL.versionIRI,
]:
for ontology_triple in ontology_graph.triples(
(None, n_ontology_predicate, None)
):
assert isinstance(ontology_triple[0], rdflib.URIRef)
assert isinstance(ontology_triple[2], rdflib.URIRef)
cdo_concepts.add(ontology_triple[0])
cdo_concepts.add(ontology_triple[2])
for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
cdo_concepts.add(ontology_triple[0])

# Also load historical ontology and version IRIs.
ontology_and_version_iris_data = importlib.resources.read_text(
case_utils.ontology, "ontology_and_version_iris.txt"
)
for line in ontology_and_version_iris_data.split("\n"):
cleaned_line = line.strip()
if cleaned_line == "":
continue
cdo_concepts.add(rdflib.URIRef(cleaned_line))

data_cdo_concepts: Set[rdflib.URIRef] = set()
for data_triple in data_graph.triples((None, None, None)):
for data_triple_member in data_triple:
if isinstance(data_triple_member, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member):
data_cdo_concepts.add(data_triple_member)
elif isinstance(data_triple_member, rdflib.Literal):
if isinstance(data_triple_member.datatype, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member.datatype):
data_cdo_concepts.add(data_triple_member.datatype)

return data_cdo_concepts - cdo_concepts


def main() -> None:
parser = argparse.ArgumentParser(
description="CASE wrapper to pySHACL command line tool."
Expand Down Expand Up @@ -181,67 +279,9 @@ def main() -> None:
_logger.debug("arg_ontology_graph = %r.", arg_ontology_graph)
ontology_graph.parse(arg_ontology_graph)

# Construct set of CDO concepts for data graph concept-existence review.
cdo_concepts: Set[rdflib.URIRef] = set()

for n_structural_class in [
NS_OWL.Class,
NS_OWL.AnnotationProperty,
NS_OWL.DatatypeProperty,
NS_OWL.ObjectProperty,
NS_RDFS.Datatype,
NS_SH.NodeShape,
NS_SH.PropertyShape,
NS_SH.Shape,
]:
for ontology_triple in ontology_graph.triples(
(None, NS_RDF.type, n_structural_class)
):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
if concept_is_cdo_concept(ontology_triple[0]):
cdo_concepts.add(ontology_triple[0])
for n_ontology_predicate in [
NS_OWL.backwardCompatibleWith,
NS_OWL.imports,
NS_OWL.incompatibleWith,
NS_OWL.priorVersion,
NS_OWL.versionIRI,
]:
for ontology_triple in ontology_graph.triples(
(None, n_ontology_predicate, None)
):
assert isinstance(ontology_triple[0], rdflib.URIRef)
assert isinstance(ontology_triple[2], rdflib.URIRef)
cdo_concepts.add(ontology_triple[0])
cdo_concepts.add(ontology_triple[2])
for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
cdo_concepts.add(ontology_triple[0])

# Also load historical ontology and version IRIs.
ontology_and_version_iris_data = importlib.resources.read_text(
case_utils.ontology, "ontology_and_version_iris.txt"
)
for line in ontology_and_version_iris_data.split("\n"):
cleaned_line = line.strip()
if cleaned_line == "":
continue
cdo_concepts.add(rdflib.URIRef(cleaned_line))

data_cdo_concepts: Set[rdflib.URIRef] = set()
for data_triple in data_graph.triples((None, None, None)):
for data_triple_member in data_triple:
if isinstance(data_triple_member, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member):
data_cdo_concepts.add(data_triple_member)
elif isinstance(data_triple_member, rdflib.Literal):
if isinstance(data_triple_member.datatype, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member.datatype):
data_cdo_concepts.add(data_triple_member.datatype)
# Get the list of undefined CDO concepts in the graph
undefined_cdo_concepts = get_invalid_cdo_concepts(data_graph, ontology_graph)

undefined_cdo_concepts = data_cdo_concepts - cdo_concepts
for undefined_cdo_concept in sorted(undefined_cdo_concepts):
warnings.warn(undefined_cdo_concept, NonExistentCDOConceptWarning)
undefined_cdo_concepts_message = (
Expand Down

0 comments on commit 39f8f11

Please sign in to comment.