From 260a105ce99fe0dd7436f3bfdbb2f60d59959fa5 Mon Sep 17 00:00:00 2001 From: Davide Armand Date: Thu, 3 Oct 2024 10:52:37 +0200 Subject: [PATCH 1/3] refactor: reorganize schema compatibility logic This in preparation for the fix for [EC-289]. - Move schema compatibility related code from __init__.py to its own module (schema_compatibility.py) - Refactor logic in: - Schema registering endpoint (`/subjects//versions`) - Schema compatibility endpoint (`/compatibility/subjects//versions/latest`) --- src/karapace/compatibility/__init__.py | 133 ------------------ src/karapace/schema_registry.py | 90 +++++++----- src/karapace/schema_registry_apis.py | 122 ++++++++-------- .../unit/compatibility/test_compatibility.py | 11 +- 4 files changed, 128 insertions(+), 228 deletions(-) diff --git a/src/karapace/compatibility/__init__.py b/src/karapace/compatibility/__init__.py index e5f61e710..3984ed9f5 100644 --- a/src/karapace/compatibility/__init__.py +++ b/src/karapace/compatibility/__init__.py @@ -4,22 +4,7 @@ Copyright (c) 2019 Aiven Ltd See LICENSE for details """ -from avro.compatibility import ( - merge, - ReaderWriterCompatibilityChecker as AvroChecker, - SchemaCompatibilityResult, - SchemaCompatibilityType, - SchemaIncompatibilityType, -) -from avro.schema import Schema as AvroSchema from enum import Enum, unique -from jsonschema import Draft7Validator -from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema -from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility -from karapace.protobuf.schema import ProtobufSchema -from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema -from karapace.schema_reader import SchemaType -from karapace.utils import assert_never import logging @@ -54,121 +39,3 @@ def is_transitive(self) -> bool: "FULL_TRANSITIVE", } return self.value in TRANSITIVE_MODES - - -def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult: - return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema) - - -def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult: - return jsonschema_compatibility(reader, writer) - - -def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: - return check_protobuf_schema_compatibility(reader, writer) - - -def check_compatibility( - old_schema: ParsedTypedSchema, - new_schema: ValidatedTypedSchema, - compatibility_mode: CompatibilityModes, -) -> SchemaCompatibilityResult: - """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" - if compatibility_mode is CompatibilityModes.NONE: - LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") - return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) - - if old_schema.schema_type is not new_schema.schema_type: - return incompatible_schema( - incompat_type=SchemaIncompatibilityType.type_mismatch, - message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", - location=[], - ) - - if old_schema.schema_type is SchemaType.AVRO: - assert isinstance(old_schema.schema, AvroSchema) - assert isinstance(new_schema.schema, AvroSchema) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=new_schema.schema, - writer_schema=old_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=old_schema.schema, - writer_schema=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_avro_compatibility( - reader_schema=new_schema.schema, - writer_schema=old_schema.schema, - ) - result = merge( - result, - check_avro_compatibility( - reader_schema=old_schema.schema, - writer_schema=new_schema.schema, - ), - ) - - elif old_schema.schema_type is SchemaType.JSONSCHEMA: - assert isinstance(old_schema.schema, Draft7Validator) - assert isinstance(new_schema.schema, Draft7Validator) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_jsonschema_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - result = merge( - result, - check_jsonschema_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ), - ) - - elif old_schema.schema_type is SchemaType.PROTOBUF: - assert isinstance(old_schema.schema, ProtobufSchema) - assert isinstance(new_schema.schema, ProtobufSchema) - if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ) - - elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_protobuf_compatibility( - reader=new_schema.schema, - writer=old_schema.schema, - ) - result = merge( - result, - check_protobuf_compatibility( - reader=old_schema.schema, - writer=new_schema.schema, - ), - ) - - else: - assert_never(f"Unknown schema_type {old_schema.schema_type}") - - return result diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 67c0fc899..b98246b3e 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -4,10 +4,12 @@ """ from __future__ import annotations +from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType from collections.abc import Sequence from contextlib import AsyncExitStack, closing -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency @@ -281,7 +283,7 @@ async def subject_version_referencedby_get( return list(referenced_by) return [] - def _resolve_and_parse(self, schema: TypedSchema) -> ParsedTypedSchema: + def resolve_and_parse(self, schema: TypedSchema) -> ParsedTypedSchema: references, dependencies = self.resolve_references(schema.references) if schema.references else (None, None) return ParsedTypedSchema.parse( schema_type=schema.schema_type, @@ -325,12 +327,8 @@ async def write_new_schema_local( ) else: # First check if any of the existing schemas for the subject match - live_schema_versions = { - version_id: schema_version - for version_id, schema_version in all_schema_versions.items() - if schema_version.deleted is False - } - if not live_schema_versions: # Previous ones have been deleted by the user. + live_versions = self.get_live_versions_sorted(all_schema_versions) + if not live_versions: # Previous ones have been deleted by the user. version = self.database.get_next_version(subject=subject) schema_id = self.database.get_schema_id(new_schema) LOG.debug( @@ -351,32 +349,15 @@ async def write_new_schema_local( ) return schema_id - compatibility_mode = self.get_compatibility_mode(subject=subject) + result = self.check_schema_compatibility(new_schema, subject) - # Run a compatibility check between on file schema(s) and the one being submitted now - # the check is either towards the latest one or against all previous ones in case of - # transitive mode - schema_versions = sorted(live_schema_versions) - if compatibility_mode.is_transitive(): - check_against = schema_versions - else: - check_against = [schema_versions[-1]] - - for old_version in check_against: - parsed_old_schema = self._resolve_and_parse(all_schema_versions[old_version].schema) - result = check_compatibility( - old_schema=parsed_old_schema, - new_schema=new_schema, - compatibility_mode=compatibility_mode, + if is_incompatible(result): + message = set(result.messages).pop() if result.messages else "" + LOG.warning( + "Incompatible schema: %s, incompatibilities: %s", result.compatibility, result.incompatibilities ) - if is_incompatible(result): - message = set(result.messages).pop() if result.messages else "" - LOG.warning( - "Incompatible schema: %s, incompatibilities: %s", result.compatibility, result.incompatibilities - ) - raise IncompatibleSchema( - f"Incompatible schema, compatibility_mode={compatibility_mode.value} {message}" - ) + compatibility_mode = self.get_compatibility_mode(subject=subject) + raise IncompatibleSchema(f"Incompatible schema, compatibility_mode={compatibility_mode.value} {message}") # We didn't find an existing schema and the schema is compatible so go and create one version = self.database.get_next_version(subject=subject) @@ -465,3 +446,48 @@ def send_delete_subject_message(self, subject: Subject, version: Version) -> Non key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"} value = {"subject": subject, "version": version.value} self.producer.send_message(key=key, value=value) + + def check_schema_compatibility( + self, + new_schema: ValidatedTypedSchema, + subject: Subject, + ) -> SchemaCompatibilityResult: + result = SchemaCompatibilityResult(SchemaCompatibilityType.compatible) + + compatibility_mode = self.get_compatibility_mode(subject=subject) + all_schema_versions: dict[Version, SchemaVersion] = self.database.find_subject_schemas( + subject=subject, include_deleted=True + ) + live_versions = self.get_live_versions_sorted(all_schema_versions) + + if not live_versions: + old_versions = [] + elif compatibility_mode.is_transitive(): + # Only check against all versions + old_versions = live_versions + else: + # Only check against latest version + old_versions = [live_versions[-1]] + + for old_version in old_versions: + old_parsed_schema = self.resolve_and_parse(all_schema_versions[old_version].schema) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_parsed_schema, + new_schema=new_schema, + compatibility_mode=compatibility_mode, + ) + + if is_incompatible(result): + break + + return result + + @staticmethod + def get_live_versions_sorted(all_schema_versions: dict[Version, SchemaVersion]) -> list[Version]: + live_schema_versions = { + version_id: schema_version + for version_id, schema_version in all_schema_versions.items() + if schema_version.deleted is False + } + return sorted(live_schema_versions) diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 20aaabc6f..2f57279fe 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -9,8 +9,9 @@ from enum import Enum, unique from http import HTTPStatus from karapace.auth import HTTPAuthorizer, Operation, User -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config from karapace.errors import ( IncompatibleSchema, @@ -34,7 +35,7 @@ from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry -from karapace.typing import JsonData, SchemaId, Subject, Version +from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.utils import JSONDecodeError from typing import Any @@ -380,63 +381,12 @@ def _invalid_version(self, content_type, version): ) async def compatibility_check( - self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None + self, content_type: str, *, subject: Subject, version: str, request: HTTPRequest, user: User | None = None ) -> None: """Check for schema compatibility""" self._check_authorization(user, Operation.Read, f"Subject:{subject}") - body = request.json - schema_type = self._validate_schema_type(content_type=content_type, data=body) - references = self._validate_references(content_type, schema_type, body) - try: - references, new_schema_dependencies = self.schema_registry.resolve_references(references) - new_schema = ValidatedTypedSchema.parse( - schema_type=schema_type, - schema_str=body["schema"], - references=references, - dependencies=new_schema_dependencies, - use_protobuf_formatter=self.config["use_protobuf_formatter"], - ) - except InvalidSchema: - self.r( - body={ - "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, - "message": f"Invalid {schema_type} schema", - }, - content_type=content_type, - status=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - try: - old = self.schema_registry.subject_version_get(subject=subject, version=Versioner.V(version)) - except InvalidVersion: - self._invalid_version(content_type, version) - except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): - self.r( - body={ - "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", - }, - content_type=content_type, - status=HTTPStatus.NOT_FOUND, - ) - old_schema_type = self._validate_schema_type(content_type=content_type, data=old) - try: - old_references = old.get("references", None) - old_dependencies = None - if old_references: - old_references, old_dependencies = self.schema_registry.resolve_references(old_references) - old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"], old_references, old_dependencies) - except InvalidSchema: - self.r( - body={ - "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, - "message": f"Found an invalid {old_schema_type} schema registered", - }, - content_type=content_type, - status=HTTPStatus.UNPROCESSABLE_ENTITY, - ) - try: compatibility_mode = self.schema_registry.get_compatibility_mode(subject=subject) except ValueError as ex: @@ -451,11 +401,11 @@ async def compatibility_check( status=HTTPStatus.INTERNAL_SERVER_ERROR, ) - result = check_compatibility( - old_schema=old_schema, - new_schema=new_schema, - compatibility_mode=compatibility_mode, - ) + new_schema = self.get_new_schema(request.json, content_type) + old_schema = self._get_old_schema(subject, Versioner.V(version), content_type) + + result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) + if is_incompatible(result): self.r({"is_compatible": False}, content_type) self.r({"is_compatible": True}, content_type) @@ -1370,3 +1320,57 @@ def no_master_error(self, content_type: str) -> None: content_type=content_type, status=HTTPStatus.INTERNAL_SERVER_ERROR, ) + + def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedSchema: + schema_type = self._validate_schema_type(content_type=content_type, data=body) + references = self._validate_references(content_type, schema_type, body) + try: + references, new_schema_dependencies = self.schema_registry.resolve_references(references) + return ValidatedTypedSchema.parse( + schema_type=schema_type, + schema_str=body["schema"], + references=references, + dependencies=new_schema_dependencies, + use_protobuf_formatter=self.config["use_protobuf_formatter"], + ) + except InvalidSchema: + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, + "message": f"Invalid {schema_type} schema", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: + try: + old = self.schema_registry.subject_version_get(subject=subject, version=version) + except InvalidVersion: + self._invalid_version(content_type, version) + except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): + self.r( + body={ + "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, + "message": f"Version {version} not found.", + }, + content_type=content_type, + status=HTTPStatus.NOT_FOUND, + ) + old_schema_type = self._validate_schema_type(content_type=content_type, data=old) + try: + old_references = old.get("references", None) + old_dependencies = None + if old_references: + old_references, old_dependencies = self.schema_registry.resolve_references(old_references) + old_schema = ParsedTypedSchema.parse(old_schema_type, old["schema"], old_references, old_dependencies) + return old_schema + except InvalidSchema: + self.r( + body={ + "error_code": SchemaErrorCodes.INVALID_SCHEMA.value, + "message": f"Found an invalid {old_schema_type} schema registered", + }, + content_type=content_type, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 641f7df06..76f0e22b9 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -3,17 +3,20 @@ See LICENSE for details """ from avro.compatibility import SchemaCompatibilityType -from karapace.compatibility import check_compatibility, CompatibilityModes +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.schema_models import SchemaType, ValidatedTypedSchema import json -def test_schema_type_can_change_when_mode_none(): +def test_schema_type_can_change_when_mode_none() -> None: avro_str = json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": "int"}]}) - json_str = '{"type":"array"}' + json_str = '{"type": "array"}' avro_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, avro_str) json_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, json_str) - result = check_compatibility(old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE) + result = SchemaCompatibility.check_compatibility( + old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE + ) assert result.compatibility is SchemaCompatibilityType.compatible From dfc476cad8891f26fe36980252142110724946d5 Mon Sep 17 00:00:00 2001 From: Davide Armand Date: Wed, 18 Sep 2024 11:12:16 +0200 Subject: [PATCH 2/3] fix: align transitive compatibility checks Make so that the transitive compatibility checks that are done in the schema creation endpoint are also done in the schema validation endpoint. In the creation endpoint (`/subjects//versions`), if the compatibility mode is transient then the new schema is checked against all schemas. In the validation endpoint (`/compatibility/subjects//versions/`): - Before this fix, only the latest schema is checked against (even in case of transitive mode) - After this fix, in case of transitive mode then all schema are checked against. Note that in this case the version provided in the POST request (``) is ignored. --- README.rst | 4 + .../compatibility/schema_compatibility.py | 138 ++++++++++ src/karapace/schema_registry.py | 4 +- src/karapace/schema_registry_apis.py | 13 +- .../integration/test_schema_compatibility.py | 235 ++++++++++++++++++ .../unit/compatibility/test_compatibility.py | 24 ++ tests/unit/test_schema_registry_api.py | 4 +- website/source/quickstart.rst | 4 + 8 files changed, 418 insertions(+), 8 deletions(-) create mode 100644 src/karapace/compatibility/schema_compatibility.py create mode 100644 tests/integration/test_schema_compatibility.py diff --git a/README.rst b/README.rst index f6e8a6736..0cc7489e4 100644 --- a/README.rst +++ b/README.rst @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke http://localhost:8081/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET http://localhost:8081/config diff --git a/src/karapace/compatibility/schema_compatibility.py b/src/karapace/compatibility/schema_compatibility.py new file mode 100644 index 000000000..07e059d50 --- /dev/null +++ b/src/karapace/compatibility/schema_compatibility.py @@ -0,0 +1,138 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from avro.compatibility import ( + merge, + ReaderWriterCompatibilityChecker as AvroChecker, + SchemaCompatibilityResult, + SchemaCompatibilityType, + SchemaIncompatibilityType, +) +from avro.schema import Schema as AvroSchema +from jsonschema import Draft7Validator +from karapace.compatibility import CompatibilityModes +from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema +from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility +from karapace.protobuf.schema import ProtobufSchema +from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema +from karapace.schema_type import SchemaType +from karapace.utils import assert_never + +import logging + +LOG = logging.getLogger(__name__) + + +class SchemaCompatibility: + @staticmethod + def check_compatibility( + old_schema: ParsedTypedSchema, + new_schema: ValidatedTypedSchema, + compatibility_mode: CompatibilityModes, + ) -> SchemaCompatibilityResult: + """Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`.""" + + if compatibility_mode is CompatibilityModes.NONE: + LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") + return SchemaCompatibilityResult(SchemaCompatibilityType.compatible) + + if old_schema.schema_type is not new_schema.schema_type: + return incompatible_schema( + incompat_type=SchemaIncompatibilityType.type_mismatch, + message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", + location=[], + ) + + if old_schema.schema_type is SchemaType.AVRO: + assert isinstance(old_schema.schema, AvroSchema) + assert isinstance(new_schema.schema, AvroSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ), + ) + elif old_schema.schema_type is SchemaType.JSONSCHEMA: + assert isinstance(old_schema.schema, Draft7Validator) + assert isinstance(new_schema.schema, Draft7Validator) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + elif old_schema.schema_type is SchemaType.PROTOBUF: + assert isinstance(old_schema.schema, ProtobufSchema) + assert isinstance(new_schema.schema, ProtobufSchema) + if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + + elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: + result = SchemaCompatibility.check_protobuf_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = merge( + result, + SchemaCompatibility.check_protobuf_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ), + ) + else: + assert_never(f"Unknown schema_type {old_schema.schema_type}") + + return result + + @staticmethod + def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult: + return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema) + + @staticmethod + def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult: + return jsonschema_compatibility(reader, writer) + + @staticmethod + def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult: + return check_protobuf_schema_compatibility(reader, writer) diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index b98246b3e..ee0ae88cd 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -463,7 +463,7 @@ def check_schema_compatibility( if not live_versions: old_versions = [] elif compatibility_mode.is_transitive(): - # Only check against all versions + # Check against all versions old_versions = live_versions else: # Only check against latest version @@ -479,7 +479,7 @@ def check_schema_compatibility( ) if is_incompatible(result): - break + return result return result diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 2f57279fe..f6d4593f7 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -402,9 +402,14 @@ async def compatibility_check( ) new_schema = self.get_new_schema(request.json, content_type) - old_schema = self._get_old_schema(subject, Versioner.V(version), content_type) - - result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) + old_schema = self.get_old_schema(subject, Versioner.V(version), content_type) + if compatibility_mode.is_transitive(): + # Ignore the schema version provided in the rest api call (`version`) + # Instead check against all previous versions (including `version` if existing) + result = self.schema_registry.check_schema_compatibility(new_schema, subject) + else: + # Check against the schema version provided in the rest api call (`version`) + result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) if is_incompatible(result): self.r({"is_compatible": False}, content_type) @@ -1343,7 +1348,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: + def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ParsedTypedSchema: try: old = self.schema_registry.subject_version_get(subject=subject, version=version) except InvalidVersion: diff --git a/tests/integration/test_schema_compatibility.py b/tests/integration/test_schema_compatibility.py new file mode 100644 index 000000000..82228ba32 --- /dev/null +++ b/tests/integration/test_schema_compatibility.py @@ -0,0 +1,235 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from collections.abc import Coroutine +from dataclasses import dataclass +from karapace.client import Client +from karapace.typing import JsonObject, Subject +from tests.base_testcase import BaseTestCase +from typing import Any, Callable, Final + +import json +import logging +import pytest + +SchemaRegitrationFunc = Callable[[Client, Subject], Coroutine[Any, Any, None]] + +LOG = logging.getLogger(__name__) + +schema_int: Final[JsonObject] = {"type": "record", "name": "schema_name", "fields": [{"type": "int", "name": "field_name"}]} +schema_long: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "long", "name": "field_name"}], +} +schema_string: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "string", "name": "field_name"}], +} +schema_double: Final[JsonObject] = { + "type": "record", + "name": "schema_name", + "fields": [{"type": "double", "name": "field_name"}], +} + + +@dataclass +class SchemaCompatibilityTestCase(BaseTestCase): + new_schema: str + compatibility_mode: str + register_baseline_schemas: SchemaRegitrationFunc + expected_is_compatible: bool | None + expected_status_code: int + expected_incompatibilities: str | None + + +async def _register_baseline_schemas_no_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_int)}, + ) + assert res.status_code == 200 + + # Changing type from int to long is compatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities(registry_async_client: Client, subject: Subject) -> None: + # Allow registering non backward compatible schemas + await _set_compatibility_mode(registry_async_client, subject, "NONE") + + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_string)}, + ) + assert res.status_code == 200 + + # Changing type from string to double is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_double)}, + ) + assert res.status_code == 200 + + +async def _register_baseline_schemas_with_incompatibilities_and_a_deleted_schema( + registry_async_client: Client, subject: Subject +) -> None: + await _register_baseline_schemas_with_incompatibilities(registry_async_client, subject) + + # Register schema + # Changing type from double to long is incompatible + res = await registry_async_client.post( + f"subjects/{subject}/versions", + json={"schemaType": "AVRO", "schema": json.dumps(schema_long)}, + ) + assert res.status_code == 200 + + # And delete it + res = await registry_async_client.delete(f"subjects/{subject}/versions/latest") + assert res.status_code == 200 + assert res.json() == 3 + + +async def _register_no_baseline_schemas( + registry_async_client: Client, subject: Subject # pylint: disable=unused-argument +) -> None: + pass + + +async def _set_compatibility_mode(registry_async_client: Client, subject: Subject, compatibility_mode: str) -> None: + res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility_mode}) + assert res.status_code == 200 + + +@pytest.mark.parametrize( + "test_case", + [ + # Case 0 + # New schema compatible with all baseline ones (int --> long, long --> long) + # Transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case0", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 1 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + SchemaCompatibilityTestCase( + test_name="case1", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_no_incompatibilities, + new_schema=json.dumps(schema_long), + expected_is_compatible=True, + expected_status_code=200, + expected_incompatibilities=None, + ), + # Case 2 + # New schema incompatible with both baseline schemas (string --> int, double --> int) + # Non-transitive mode + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case2", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 3 + # Same as previous case, but in non-transitive mode + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case3", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 4 + # Same as case 2, but with a deleted schema among baseline ones + # Non-transitive mode + # --> The delete schema is ignored + # --> Incompatibilies are found only against last baseline schema (double --> int) + SchemaCompatibilityTestCase( + test_name="case4", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: double", + ), + # Case 5 + # Same as case 3, but with a deleted schema among baseline ones + # --> The delete schema is ignored + # --> Incompatibilies are found in the first baseline schema (string --> int) + SchemaCompatibilityTestCase( + test_name="case5", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_baseline_schemas_with_incompatibilities_and_a_deleted_schema, + new_schema=json.dumps(schema_int), + expected_is_compatible=False, + expected_status_code=200, + expected_incompatibilities="reader type: int not compatible with writer type: string", + ), + # Case 6 + # A new schema and no baseline schemas + # Non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case6", + compatibility_mode="BACKWARD", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + # Case 7 + # Same as previous case, but in non-transitive mode + # --> No incompatibilities are found + # --> Status code is 404 because `latest` version to check against does not exists + SchemaCompatibilityTestCase( + test_name="case7", + compatibility_mode="BACKWARD_TRANSITIVE", + register_baseline_schemas=_register_no_baseline_schemas, + new_schema=json.dumps(schema_int), + expected_is_compatible=None, + expected_status_code=404, + expected_incompatibilities=None, + ), + ], +) +async def test_schema_compatibility(test_case: SchemaCompatibilityTestCase, registry_async_client: Client) -> None: + subject = Subject(f"subject_{test_case.test_name}") + + await test_case.register_baseline_schemas(registry_async_client, subject) + await _set_compatibility_mode(registry_async_client, subject, test_case.compatibility_mode) + + LOG.info("Validating new schema: %s", test_case.new_schema) + res = await registry_async_client.post( + f"compatibility/subjects/{subject}/versions/latest", json={"schema": test_case.new_schema} + ) + + assert res.status_code == test_case.expected_status_code + assert res.json().get("is_compatible") == test_case.expected_is_compatible + assert res.json().get("incompatibilities", None) == test_case.expected_incompatibilities diff --git a/tests/unit/compatibility/test_compatibility.py b/tests/unit/compatibility/test_compatibility.py index 76f0e22b9..af41aae99 100644 --- a/tests/unit/compatibility/test_compatibility.py +++ b/tests/unit/compatibility/test_compatibility.py @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None: old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE ) assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_compatible_in_transitive_mode() -> None: + old_json = '{"type": "array", "name": "name_old"}' + new_json = '{"type": "array", "name": "name_new"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.compatible + + +def test_schema_incompatible_in_transitive_mode() -> None: + old_json = '{"type": "array"}' + new_json = '{"type": "integer"}' + old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json) + new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json) + + result = SchemaCompatibility.check_compatibility( + old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE + ) + assert result.compatibility is SchemaCompatibilityType.incompatible diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 6d850f5fc..7fcecd47e 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -14,7 +14,7 @@ import pytest -async def test_validate_schema_request_body(): +async def test_validate_schema_request_body() -> None: controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) controller._validate_schema_request_body( # pylint: disable=W0212 @@ -30,7 +30,7 @@ async def test_validate_schema_request_body(): assert str(exc_info.value) == "HTTPResponse 422" -async def test_forward_when_not_ready(): +async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) diff --git a/website/source/quickstart.rst b/website/source/quickstart.rst index 6e6ecdba6..f640e68d2 100644 --- a/website/source/quickstart.rst +++ b/website/source/quickstart.rst @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke $KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest {"is_compatible":true} +NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the +compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done +when trying to register the new schema through the `subjects//versions` endpoint. + Get current global backwards compatibility setting value:: $ curl -X GET $KARAPACE_REGISTRY_URI/config From 429e18c53007e61d56defc0f4bb37a61ec32240b Mon Sep 17 00:00:00 2001 From: Davide Armand Date: Thu, 26 Sep 2024 13:19:09 +0200 Subject: [PATCH 3/3] feat: return all schema validation errors --- src/karapace/schema_registry.py | 6 ++++-- src/karapace/schema_registry_apis.py | 3 ++- .../test_dependencies_compatibility_protobuf.py | 10 +++++----- tests/integration/test_schema.py | 12 ++++++++---- tests/integration/test_schema_protobuf.py | 4 ++-- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index ee0ae88cd..67f58fddd 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -352,12 +352,14 @@ async def write_new_schema_local( result = self.check_schema_compatibility(new_schema, subject) if is_incompatible(result): - message = set(result.messages).pop() if result.messages else "" LOG.warning( "Incompatible schema: %s, incompatibilities: %s", result.compatibility, result.incompatibilities ) compatibility_mode = self.get_compatibility_mode(subject=subject) - raise IncompatibleSchema(f"Incompatible schema, compatibility_mode={compatibility_mode.value} {message}") + raise IncompatibleSchema( + f"Incompatible schema, compatibility_mode={compatibility_mode.value}. " + f"Incompatibilities: {', '.join(result.messages)[:300]}" + ) # We didn't find an existing schema and the schema is compatible so go and create one version = self.database.get_next_version(subject=subject) diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index f6d4593f7..a37a3ff9f 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -412,7 +412,8 @@ async def compatibility_check( result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode) if is_incompatible(result): - self.r({"is_compatible": False}, content_type) + maybe_truncated_error = ", ".join(result.messages)[:300] + self.r({"is_compatible": False, "incompatibilities": maybe_truncated_error}, content_type) self.r({"is_compatible": True}, content_type) async def schemas_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None): diff --git a/tests/integration/test_dependencies_compatibility_protobuf.py b/tests/integration/test_dependencies_compatibility_protobuf.py index 2bacbdf7b..725611b5c 100644 --- a/tests/integration/test_dependencies_compatibility_protobuf.py +++ b/tests/integration/test_dependencies_compatibility_protobuf.py @@ -183,7 +183,7 @@ async def test_protobuf_schema_compatibility_dependencies(registry_async_client: json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False @pytest.mark.parametrize("trail", ["", "/"]) @@ -271,7 +271,7 @@ async def test_protobuf_schema_compatibility_dependencies1(registry_async_client json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False # Do compatibility check when message field is altered from referenced type to google type @@ -339,7 +339,7 @@ async def test_protobuf_schema_compatibility_dependencies1g(registry_async_clien json={"schemaType": "PROTOBUF", "schema": evolved_schema}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False # Do compatibility check when message field is altered from google type to referenced type @@ -407,7 +407,7 @@ async def test_protobuf_schema_compatibility_dependencies1g_otherway(registry_as json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": container_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False @pytest.mark.parametrize("trail", ["", "/"]) @@ -491,7 +491,7 @@ async def test_protobuf_schema_compatibility_dependencies2(registry_async_client json={"schemaType": "PROTOBUF", "schema": evolved_schema, "references": evolved_references}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False SIMPLE_SCHEMA = """\ diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 546c19e0b..bb4448d80 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -332,7 +332,8 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": False} + assert res.json().get("is_compatible") is False + assert res.json().get("incompatibilities") == "reader type: string not compatible with writer type: int" @pytest.mark.parametrize("trail", ["", "/"]) @@ -536,7 +537,7 @@ def _test_cases(): json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 - assert res.json() == {"is_compatible": expected} + assert res.json().get("is_compatible") == expected @pytest.mark.parametrize("trail", ["", "/"]) @@ -3243,7 +3244,7 @@ async def test_schema_non_compliant_name_in_existing( json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 - assert not res.json().get("is_compatible") + assert res.json().get("is_compatible") is False # Post evolved schema, should not be compatible and rejected. res = await registry_async_client.post( @@ -3253,7 +3254,10 @@ async def test_schema_non_compliant_name_in_existing( assert res.status_code == 409 assert res.json() == { "error_code": 409, - "message": "Incompatible schema, compatibility_mode=BACKWARD expected: compliant_name_test.test-schema", + "message": ( + "Incompatible schema, compatibility_mode=BACKWARD. " + "Incompatibilities: expected: compliant_name_test.test-schema" + ), } # Send compatibility configuration for subject that disabled backwards compatibility. diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index 4b4471cb2..55825fb92 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -1123,8 +1123,8 @@ async def test_protobuf_error(registry_async_client: Client) -> None: expected=409, expected_msg=( # ACTUALLY THERE NO MESSAGE_DROP!!! - "Incompatible schema, compatibility_mode=BACKWARD " - "Incompatible modification Modification.MESSAGE_DROP found" + "Incompatible schema, compatibility_mode=BACKWARD. " + "Incompatibilities: Incompatible modification Modification.MESSAGE_DROP found" ), ) print(f"Adding new schema, subject: '{testdata.subject}'\n{testdata.schema_str}")