Skip to content

Commit

Permalink
test: store soft deleted schema
Browse files Browse the repository at this point in the history
When _schemas topic has been compacted and only soft deleted version
of the schema is present the schema must be stored in the in memory
database.
Fixes #170.
  • Loading branch information
jjaakola-aiven committed Jun 6, 2024
1 parent a189931 commit 4135724
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import (
KafkaSchemaReader,
Expand All @@ -17,9 +19,12 @@
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from karapace.typing import SchemaId
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import confluent_kafka
import json
import pytest
import random
import time
Expand Down Expand Up @@ -184,3 +189,50 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_soft_deleted_schema_storing() -> None:
"""This tests a case when _schemas has been compacted and only
the soft deleted version of the schema is present.
"""
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)
soft_deleted_schema_record = Mock(spec=confluent_kafka.Message)
soft_deleted_schema_record.error.return_value = None
soft_deleted_schema_record.key.return_value = json.dumps(
{
"keytype": "SCHEMA",
"subject": "soft-delete-test",
"version": 1,
"magic": 0,
}
)
soft_deleted_schema_record.value.return_value = json.dumps(
{
"deleted": True,
"id": 1,
"schema": '"int"',
"subject": "test-soft-delete-test",
"version": 1,
}
)

consumer_mock.consume.return_value = [soft_deleted_schema_record]
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 1)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0

schema_reader.handle_messages()

soft_deleted_stored_schema = schema_reader.database.find_schema(schema_id=SchemaId(1))
assert soft_deleted_stored_schema is not None

0 comments on commit 4135724

Please sign in to comment.