Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: consume only one record at a time after startup #877

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@
KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0
SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0

# For good startup performance the consumption of multiple
# records for each consume round is essential.
# Consumer default is 1 message for each consume call and after
# startup the default is a good value. If consumer would expect
# more messages it would return control back after timeout and
# making schema storing latency to be `processing time + timeout`.
MAX_MESSAGES_TO_CONSUME_ON_STARTUP: Final = 1000
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1
MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2

# Metric names
METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed"
METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode"
Expand Down Expand Up @@ -120,11 +130,8 @@ def __init__(
) -> None:
Thread.__init__(self, name="schema-reader")
self.master_coordinator = master_coordinator
self.timeout_s = 0.2
# Consumer default is 1 message for each consume call
# For good startup performance the consumption of multiple
# records for each consume round is essential.
self.max_messages_to_process = 1000
self.timeout_s = MESSAGE_CONSUME_TIMEOUT_SECONDS
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_ON_STARTUP
self.config = config

self.database = database
Expand Down Expand Up @@ -301,6 +308,7 @@ def _is_ready(self) -> bool:
self.startup_previous_processed_offset = self.offset
ready = self.offset >= self._highest_offset
if ready:
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
return ready

Expand Down
32 changes: 31 additions & 1 deletion tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED
from karapace.schema_reader import (
KafkaSchemaReader,
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP,
MAX_MESSAGES_TO_CONSUME_ON_STARTUP,
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

Expand Down Expand Up @@ -154,3 +160,27 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None:

schema_reader.handle_messages()
assert schema_reader.ready is testcase.expected


def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
key_formatter_mock = Mock()
consumer_mock = Mock()
consumer_mock.consume.return_value = []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 1)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
Loading