Skip to content

Commit

Permalink
fix: align transitive compatibility checks
Browse files Browse the repository at this point in the history
Make so that the transitive compatibility checks that are done in the
schema creation endpoint are also done in the schema validation
endpoint.

In the creation endpoint (`/subjects/<subject-key>/versions`), if the
compatibility mode is transient then the new schema is checked against
all schemas.

In the validation endpoint (`/compatibility/subjects/<subject-key>/versions/<schema-version>`):
 - Before this fix, only the latest schema is checked against (even in
   case of transitive mode)
 - After this fix, in case of transitive mode then all schema are
   checked against.
   Note that in this case the version provided in the POST request
   (`<schema-version>`) is ignored.
  • Loading branch information
davide-armand committed Oct 10, 2024
1 parent 260a105 commit dfc476c
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 8 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET http://localhost:8081/config
Expand Down
138 changes: 138 additions & 0 deletions src/karapace/compatibility/schema_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import (
merge,
ReaderWriterCompatibilityChecker as AvroChecker,
SchemaCompatibilityResult,
SchemaCompatibilityType,
SchemaIncompatibilityType,
)
from avro.schema import Schema as AvroSchema
from jsonschema import Draft7Validator
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.utils import assert_never

import logging

LOG = logging.getLogger(__name__)


class SchemaCompatibility:
@staticmethod
def check_compatibility(
old_schema: ParsedTypedSchema,
new_schema: ValidatedTypedSchema,
compatibility_mode: CompatibilityModes,
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

if old_schema.schema_type is not new_schema.schema_type:
return incompatible_schema(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.JSONSCHEMA:
assert isinstance(old_schema.schema, Draft7Validator)
assert isinstance(new_schema.schema, Draft7Validator)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.PROTOBUF:
assert isinstance(old_schema.schema, ProtobufSchema)
assert isinstance(new_schema.schema, ProtobufSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
else:
assert_never(f"Unknown schema_type {old_schema.schema_type}")

return result

@staticmethod
def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult:
return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)

@staticmethod
def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)

@staticmethod
def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)
4 changes: 2 additions & 2 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def check_schema_compatibility(
if not live_versions:
old_versions = []
elif compatibility_mode.is_transitive():
# Only check against all versions
# Check against all versions
old_versions = live_versions
else:
# Only check against latest version
Expand All @@ -479,7 +479,7 @@ def check_schema_compatibility(
)

if is_incompatible(result):
break
return result

return result

Expand Down
13 changes: 9 additions & 4 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,14 @@ async def compatibility_check(
)

new_schema = self.get_new_schema(request.json, content_type)
old_schema = self._get_old_schema(subject, Versioner.V(version), content_type)

result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode)
old_schema = self.get_old_schema(subject, Versioner.V(version), content_type)
if compatibility_mode.is_transitive():
# Ignore the schema version provided in the rest api call (`version`)
# Instead check against all previous versions (including `version` if existing)
result = self.schema_registry.check_schema_compatibility(new_schema, subject)
else:
# Check against the schema version provided in the rest api call (`version`)
result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode)

if is_incompatible(result):
self.r({"is_compatible": False}, content_type)
Expand Down Expand Up @@ -1343,7 +1348,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema:
def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema:
try:
old = self.schema_registry.subject_version_get(subject=subject, version=version)
except InvalidVersion:
Expand Down
Loading

0 comments on commit dfc476c

Please sign in to comment.