Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

case_validate: Expose CDO IRI typo-checker as function #121

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 100 additions & 60 deletions case_utils/case_validate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,104 @@ def concept_is_cdo_concept(n_concept: rdflib.URIRef) -> bool:
) or concept_iri.startswith("https://ontology.caseontology.org/")


def get_invalid_cdo_concepts(
data_graph: rdflib.Graph, ontology_graph: rdflib.Graph
) -> Set[rdflib.URIRef]:
"""
Get the set of concepts in the data graph that are not part of the CDO ontologies as specified with the ontology_graph argument.

:param data_graph: The data graph to validate.
:param ontology_graph: The ontology graph to use for validation.
:return: The list of concepts in the data graph that are not part of the CDO ontology.

>>> from case_utils.namespace import NS_RDF, NS_OWL, NS_UCO_CORE
>>> from rdflib import Graph, Literal, Namespace, URIRef
>>> # Define a namespace for a knowledge base, and a namespace for custom extensions.
>>> ns_kb = Namespace("http://example.org/kb/")
>>> ns_ex = Namespace("http://example.org/ontology/")
>>> dg = Graph()
>>> og = Graph()
>>> # Use an ontology graph in review that includes only a single class and a single property excerpted from UCO, but also a single custom property.
>>> _ = og.add((NS_UCO_CORE.UcoObject, NS_RDF.type, NS_OWL.Class))
>>> _ = og.add((NS_UCO_CORE.name, NS_RDF.type, NS_OWL.DatatypeProperty))
>>> _ = og.add((ns_ex.ourCustomProperty, NS_RDF.type, NS_OWL.DatatypeProperty))
>>> # Define an individual.
>>> n_uco_object = ns_kb["UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c"]
>>> n_uco_object
rdflib.term.URIRef('http://example.org/kb/UcoObject-f494d239-d9fd-48da-bc07-461ba86d8c6c')
>>> # Review a data graph that includes only the single individual, class typo'd (capitalized incorrectly), but property OK.
>>> _ = dg.add((n_uco_object, NS_RDF.type, NS_UCO_CORE.UCOObject))
>>> _ = dg.add((n_uco_object, NS_UCO_CORE.name, Literal("Test")))
>>> _ = dg.add((n_uco_object, ns_ex.customProperty, Literal("Custom Value")))
>>> invalid_cdo_concepts = get_invalid_cdo_concepts(dg, og)
>>> invalid_cdo_concepts
{rdflib.term.URIRef('https://ontology.unifiedcyberontology.org/uco/core/UCOObject')}
>>> # Note that the property "ourCustomProperty" was typo'd in the data graph, but this was not reported.
>>> assert ns_ex.ourCustomProperty not in invalid_cdo_concepts
"""
# Construct set of CDO concepts for data graph concept-existence review.
cdo_concepts: Set[rdflib.URIRef] = set()

for n_structural_class in [
NS_OWL.Class,
NS_OWL.AnnotationProperty,
NS_OWL.DatatypeProperty,
NS_OWL.ObjectProperty,
NS_RDFS.Datatype,
NS_SH.NodeShape,
NS_SH.PropertyShape,
NS_SH.Shape,
]:
for ontology_triple in ontology_graph.triples(
(None, NS_RDF.type, n_structural_class)
):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
if concept_is_cdo_concept(ontology_triple[0]):
cdo_concepts.add(ontology_triple[0])
for n_ontology_predicate in [
NS_OWL.backwardCompatibleWith,
NS_OWL.imports,
NS_OWL.incompatibleWith,
NS_OWL.priorVersion,
NS_OWL.versionIRI,
]:
for ontology_triple in ontology_graph.triples(
(None, n_ontology_predicate, None)
):
assert isinstance(ontology_triple[0], rdflib.URIRef)
assert isinstance(ontology_triple[2], rdflib.URIRef)
cdo_concepts.add(ontology_triple[0])
cdo_concepts.add(ontology_triple[2])
for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
cdo_concepts.add(ontology_triple[0])

# Also load historical ontology and version IRIs.
ontology_and_version_iris_data = importlib.resources.read_text(
case_utils.ontology, "ontology_and_version_iris.txt"
)
for line in ontology_and_version_iris_data.split("\n"):
cleaned_line = line.strip()
if cleaned_line == "":
continue
cdo_concepts.add(rdflib.URIRef(cleaned_line))

data_cdo_concepts: Set[rdflib.URIRef] = set()
for data_triple in data_graph.triples((None, None, None)):
for data_triple_member in data_triple:
if isinstance(data_triple_member, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member):
data_cdo_concepts.add(data_triple_member)
elif isinstance(data_triple_member, rdflib.Literal):
if isinstance(data_triple_member.datatype, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member.datatype):
data_cdo_concepts.add(data_triple_member.datatype)

return data_cdo_concepts - cdo_concepts


def main() -> None:
parser = argparse.ArgumentParser(
description="CASE wrapper to pySHACL command line tool."
Expand Down Expand Up @@ -181,67 +279,9 @@ def main() -> None:
_logger.debug("arg_ontology_graph = %r.", arg_ontology_graph)
ontology_graph.parse(arg_ontology_graph)

# Construct set of CDO concepts for data graph concept-existence review.
cdo_concepts: Set[rdflib.URIRef] = set()

for n_structural_class in [
NS_OWL.Class,
NS_OWL.AnnotationProperty,
NS_OWL.DatatypeProperty,
NS_OWL.ObjectProperty,
NS_RDFS.Datatype,
NS_SH.NodeShape,
NS_SH.PropertyShape,
NS_SH.Shape,
]:
for ontology_triple in ontology_graph.triples(
(None, NS_RDF.type, n_structural_class)
):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
if concept_is_cdo_concept(ontology_triple[0]):
cdo_concepts.add(ontology_triple[0])
for n_ontology_predicate in [
NS_OWL.backwardCompatibleWith,
NS_OWL.imports,
NS_OWL.incompatibleWith,
NS_OWL.priorVersion,
NS_OWL.versionIRI,
]:
for ontology_triple in ontology_graph.triples(
(None, n_ontology_predicate, None)
):
assert isinstance(ontology_triple[0], rdflib.URIRef)
assert isinstance(ontology_triple[2], rdflib.URIRef)
cdo_concepts.add(ontology_triple[0])
cdo_concepts.add(ontology_triple[2])
for ontology_triple in ontology_graph.triples((None, NS_RDF.type, NS_OWL.Ontology)):
if not isinstance(ontology_triple[0], rdflib.URIRef):
continue
cdo_concepts.add(ontology_triple[0])

# Also load historical ontology and version IRIs.
ontology_and_version_iris_data = importlib.resources.read_text(
case_utils.ontology, "ontology_and_version_iris.txt"
)
for line in ontology_and_version_iris_data.split("\n"):
cleaned_line = line.strip()
if cleaned_line == "":
continue
cdo_concepts.add(rdflib.URIRef(cleaned_line))

data_cdo_concepts: Set[rdflib.URIRef] = set()
for data_triple in data_graph.triples((None, None, None)):
for data_triple_member in data_triple:
if isinstance(data_triple_member, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member):
data_cdo_concepts.add(data_triple_member)
elif isinstance(data_triple_member, rdflib.Literal):
if isinstance(data_triple_member.datatype, rdflib.URIRef):
if concept_is_cdo_concept(data_triple_member.datatype):
data_cdo_concepts.add(data_triple_member.datatype)
# Get the list of undefined CDO concepts in the graph
undefined_cdo_concepts = get_invalid_cdo_concepts(data_graph, ontology_graph)

undefined_cdo_concepts = data_cdo_concepts - cdo_concepts
for undefined_cdo_concept in sorted(undefined_cdo_concepts):
warnings.warn(undefined_cdo_concept, NonExistentCDOConceptWarning)
undefined_cdo_concepts_message = (
Expand Down
Loading