Skip to content

Commit

Permalink
tests: add tests on schema compatibility checks
Browse files Browse the repository at this point in the history
[EC-289]
  • Loading branch information
davide-armand committed Sep 19, 2024
1 parent 5f01c75 commit 4879bef
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 3 deletions.
24 changes: 24 additions & 0 deletions tests/unit/compatibility/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,27 @@ def test_schema_type_can_change_when_mode_none() -> None:
old_schema=avro_schema, new_schema=json_schema, compatibility_mode=CompatibilityModes.NONE
)
assert result.compatibility is SchemaCompatibilityType.compatible


def test_schema_compatible_in_transitive_mode() -> None:
old_json = '{"type": "array", "name": "name_old"}'
new_json = '{"type": "array", "name": "name_new"}'
old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json)

result = SchemaCompatibility.check_compatibility(
old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE
)
assert result.compatibility is SchemaCompatibilityType.compatible


def test_schema_incompatible_in_transitive_mode() -> None:
old_json = '{"type": "array"}'
new_json = '{"type": "integer"}'
old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, old_json)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, new_json)

result = SchemaCompatibility.check_compatibility(
old_schema=old_schema, new_schema=new_schema, compatibility_mode=CompatibilityModes.FULL_TRANSITIVE
)
assert result.compatibility is SchemaCompatibilityType.incompatible
257 changes: 257 additions & 0 deletions tests/unit/test_schema_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.schema_compatibility import SchemaCompatibility
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.schema_models import ParsedTypedSchema, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_registry import KarapaceSchemaRegistry
from karapace.schema_type import SchemaType
from karapace.typing import Subject, Version
from unittest import mock
from unittest.mock import MagicMock, Mock

import logging

LOG = logging.getLogger(__name__)


class MockedSchemaRegistry:
def __init__(self, compatibility_mode: CompatibilityModes, has_deleted_schema: bool = False):
self.schema_registry = KarapaceSchemaRegistry(DEFAULTS)
self.compatibility_mode = compatibility_mode
self.schema_registry.get_compatibility_mode = Mock(return_value=self.compatibility_mode)

schema_version1 = Mock(spec=SchemaVersion)
schema_version1.deleted = False
typed_schema1 = Mock(spec=TypedSchema)
schema_version1.schema = typed_schema1

schema_version2 = Mock(spec=SchemaVersion)
schema_version2.deleted = has_deleted_schema
typed_schema2 = Mock(spec=TypedSchema)
schema_version2.schema = typed_schema2

schema_version3 = Mock(spec=SchemaVersion)
typed_schema3 = Mock(spec=TypedSchema)
schema_version3.deleted = False
schema_version3.schema = typed_schema3

self.schema_registry.database = Mock(spec=InMemoryDatabase)

self.schema_registry.database.find_subject_schemas = Mock(
return_value={
Version(1): schema_version1,
Version(2): schema_version2,
Version(3): schema_version3,
}
)

self.parsed_schema1 = Mock(spec=ParsedTypedSchema)
self.parsed_schema2 = Mock(spec=ParsedTypedSchema)
self.parsed_schema3 = Mock(spec=ParsedTypedSchema)

def resolve_and_parse_mock(schema: TypedSchema) -> ParsedTypedSchema:
if schema == typed_schema1:
return self.parsed_schema1
if schema == typed_schema2:
return self.parsed_schema2
if schema == typed_schema3:
return self.parsed_schema3
raise ValueError(f"Unexpected object {schema}")

self.schema_registry.resolve_and_parse = MagicMock(side_effect=resolve_and_parse_mock)

def check_schema_compatibility(
self,
new_schema: ValidatedTypedSchema,
subject: Subject,
) -> SchemaCompatibilityResult:
return self.schema_registry.check_schema_compatibility(new_schema, subject)


async def test_schema_compatible_in_transitive_mode() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.compatible

# All 3 schemas are checked against
SchemaCompatibility.check_compatibility.assert_has_calls(
[
mock.call(
old_schema=schema_registry.parsed_schema1,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
mock.call(
old_schema=schema_registry.parsed_schema2,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
mock.call(
old_schema=schema_registry.parsed_schema3,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
]
)


async def test_schema_incompatible_in_transitive_mode() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.incompatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.incompatible

# Only one schema is checked against (first fail stops all checks)
SchemaCompatibility.check_compatibility.assert_has_calls(
[
mock.call(
old_schema=schema_registry.parsed_schema1,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
]
)


async def test_schema_compatible_in_not_transitive_mode() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.compatible

# Only the last schema is checked against (not transitive)
SchemaCompatibility.check_compatibility.assert_has_calls(
[
mock.call(
old_schema=schema_registry.parsed_schema3,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
)
]
)


async def test_schema_incompatible_in_not_transitive_mode() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.incompatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.incompatible

# Only the last schema is checked against (not transitive)
SchemaCompatibility.check_compatibility.assert_has_calls(
[
mock.call(
old_schema=schema_registry.parsed_schema3,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
)
]
)


async def test_schema_compatible_with_no_live_schemas() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# No schemas in registry
schema_registry.schema_registry.database.find_subject_schemas = Mock(return_value={})

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.compatible

# No check is done (no existing schemas)
SchemaCompatibility.check_compatibility.assert_not_called()


async def test_schema_compatible_in_transitive_mode_with_deleted_schema() -> None:
# Given
schema_registry = MockedSchemaRegistry(CompatibilityModes.FULL_TRANSITIVE, has_deleted_schema=True)
new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
subject = Subject("subject")

# Don't test the actual compatibility checks
result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible
SchemaCompatibility.check_compatibility = Mock(return_value=result)

# When
schema_registry.check_schema_compatibility(new_schema, subject)

# Then
assert result.compatibility is SchemaCompatibilityType.compatible

# Only 2 schemas are checked against (the 3rd one is deleted)
SchemaCompatibility.check_compatibility.assert_has_calls(
[
mock.call(
old_schema=schema_registry.parsed_schema1,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
mock.call(
old_schema=schema_registry.parsed_schema3,
new_schema=new_schema,
compatibility_mode=schema_registry.compatibility_mode,
),
]
)
51 changes: 48 additions & 3 deletions tests/unit/test_schema_registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
See LICENSE for details
"""
from aiohttp.test_utils import TestClient, TestServer
from avro.compatibility import SchemaCompatibilityResult, SchemaCompatibilityType
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.schema_compatibility import SchemaCompatibility
from karapace.config import DEFAULTS, set_config_defaults
from karapace.rapu import HTTPResponse
from karapace.rapu import HTTPRequest, HTTPResponse
from karapace.schema_models import ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_registry import KarapaceSchemaRegistry
from karapace.schema_registry_apis import KarapaceSchemaRegistryController
from karapace.schema_type import SchemaType
from karapace.typing import Subject
from unittest.mock import ANY, AsyncMock, Mock, patch, PropertyMock

import asyncio
import pytest


async def test_validate_schema_request_body():
async def test_validate_schema_request_body() -> None:
controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS))

controller._validate_schema_request_body( # pylint: disable=W0212
Expand All @@ -30,7 +36,7 @@ async def test_validate_schema_request_body():
assert str(exc_info.value) == "HTTPResponse 422"


async def test_forward_when_not_ready():
async def test_forward_when_not_ready() -> None:
with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class:
schema_reader_mock = Mock(spec=KafkaSchemaReader)
ready_property_mock = PropertyMock(return_value=False)
Expand Down Expand Up @@ -63,3 +69,42 @@ async def test_forward_when_not_ready():
mock_forward_func.assert_called_once_with(
request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET"
)


async def test_compatibility_check_in_not_transitive_mode() -> None:
# Given
config = set_config_defaults(DEFAULTS)
config["compatibility"] = "FULL_TRANSITIVE"
controller = KarapaceSchemaRegistryController(config=config)

result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible

compatibility_mode = CompatibilityModes.FULL
controller.schema_registry = Mock(spec=KarapaceSchemaRegistry)
controller.schema_registry.get_compatibility_mode = Mock(return_value=compatibility_mode)
SchemaCompatibility.check_compatibility = Mock(return_value=result)

new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
controller.get_new_schema = Mock(return_value=new_schema)

old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
controller.get_old_schema = Mock(return_value=old_schema)

request_mock = Mock(HTTPRequest)
request_mock.json = '{"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}}'

# When
with pytest.raises(HTTPResponse) as exc_info:
await controller.compatibility_check(
"application/json",
subject=Subject("subject1"),
version="1",
request=request_mock,
)

# Then only check for compatibility against the provided schema version
SchemaCompatibility.check_compatibility.assert_called_once_with(old_schema, new_schema, compatibility_mode)

assert exc_info.type is HTTPResponse
assert str(exc_info.value) == "HTTPResponse 200"

0 comments on commit 4879bef

Please sign in to comment.