-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
schema-reader: Wrap shutdown feature around flag
We'd like to allow the shutdown logic on corrutp schema be guarded by a feature flag so we do not surprise customers, this is disabled by default.
- Loading branch information
Showing
9 changed files
with
179 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
""" | ||
Copyright (c) 2024 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
|
||
from karapace.config import Config | ||
from karapace.errors import CorruptKafkaRecordException | ||
from karapace.typing import StrEnum | ||
|
||
import aiokafka.errors as Errors | ||
import enum | ||
import logging | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaErrorLocation(StrEnum): | ||
SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR" | ||
SCHEMA_READER = "SCHEMA_READER" | ||
|
||
|
||
class KafkaRetriableErrors(enum.Enum): | ||
SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,) | ||
|
||
|
||
class KafkaErrorHandler: | ||
def __init__(self, config: Config) -> None: | ||
self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"] | ||
self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"] | ||
|
||
def log(self, location: KafkaErrorLocation, error: BaseException) -> None: | ||
LOG.warning("%s encountered error - %s", location, error) | ||
|
||
def handle_schema_coordinator_error(self, error: BaseException) -> None: | ||
if getattr(error, "retriable", False) or ( | ||
error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value | ||
): | ||
self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error) | ||
if not self.retriable_errors_silenced: | ||
raise error | ||
|
||
def handle_schema_reader_error(self, error: BaseException) -> None: | ||
if self.schema_reader_strict_mode: | ||
raise CorruptKafkaRecordException from error | ||
|
||
def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None: | ||
return getattr(self, f"handle_{location.lower()}_error")(error=error) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
""" | ||
Copyright (c) 2024 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from _pytest.logging import LogCaptureFixture | ||
from karapace.errors import CorruptKafkaRecordException | ||
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation | ||
|
||
import aiokafka.errors as Errors | ||
import logging | ||
import pytest | ||
|
||
|
||
@pytest.fixture(name="kafka_error_handler") | ||
def fixture_kafka_error_handler() -> KafkaErrorHandler: | ||
config = { | ||
"kafka_schema_reader_strict_mode": False, | ||
"kafka_retriable_errors_silenced": True, | ||
} | ||
return KafkaErrorHandler(config=config) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"retriable_error", | ||
[ | ||
Errors.NodeNotReadyError("node is still starting"), | ||
Errors.GroupCoordinatorNotAvailableError("group is unavailable"), | ||
Errors.NoBrokersAvailable("no brokers available"), | ||
], | ||
) | ||
def test_handle_error_retriable_schema_coordinator( | ||
caplog: LogCaptureFixture, | ||
kafka_error_handler: KafkaErrorHandler, | ||
retriable_error: Errors.KafkaError, | ||
): | ||
kafka_error_handler.retriable_errors_silenced = True | ||
with caplog.at_level(logging.WARNING, logger="karapace.error_handler"): | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) | ||
|
||
for log in caplog.records: | ||
assert log.name == "karapace.kafka_error_handler" | ||
assert log.levelname == "WARNING" | ||
assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}" | ||
|
||
# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour | ||
kafka_error_handler.retriable_errors_silenced = False | ||
with pytest.raises(retriable_error.__class__): | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"nonretriable_error", | ||
[ | ||
ValueError("value missing"), | ||
Errors.GroupAuthorizationFailedError("authorization failed"), | ||
Errors.InvalidCommitOffsetSizeError("invalid commit size"), | ||
], | ||
) | ||
def test_handle_error_nonretriable_schema_coordinator( | ||
kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException | ||
) -> None: | ||
kafka_error_handler.retriable_errors_silenced = False | ||
with pytest.raises(nonretriable_error.__class__): | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) | ||
|
||
# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour | ||
kafka_error_handler.retriable_errors_silenced = True | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) | ||
|
||
|
||
def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None: | ||
kafka_error_handler.schema_reader_strict_mode = True | ||
with pytest.raises(CorruptKafkaRecordException): | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) | ||
|
||
# Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour | ||
kafka_error_handler.schema_reader_strict_mode = False | ||
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters