diff --git a/container/compose.yml b/container/compose.yml index 511d924e8..fa2c53265 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -80,6 +80,8 @@ services: KARAPACE_COMPATIBILITY: FULL KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true karapace-rest: image: ghcr.io/aiven-open/karapace:develop @@ -106,6 +108,8 @@ services: KARAPACE_LOG_LEVEL: WARNING KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true prometheus: image: prom/prometheus diff --git a/karapace/config.py b/karapace/config.py index 894164586..bbae62701 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -80,6 +80,8 @@ class Config(TypedDict): protobuf_runtime_directory: str statsd_host: str statsd_port: int + kafka_schema_reader_strict_mode: bool + kafka_retriable_errors_silenced: bool sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False): "protobuf_runtime_directory": "runtime", "statsd_host": "127.0.0.1", "statsd_port": 8125, + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py index b9fdb3a12..5abd4bd31 100644 --- a/karapace/coordinator/master_coordinator.py +++ b/karapace/coordinator/master_coordinator.py @@ -50,7 +50,7 @@ async def start(self) -> None: await self._kafka_client.bootstrap() break except KafkaConnectionError: - LOG.exception("Kafka client bootstrap failed.") + LOG.warning("Kafka client bootstrap failed.") await asyncio.sleep(0.5) while not self._kafka_client.cluster.brokers(): diff --git a/karapace/kafka_error_handler.py b/karapace/kafka_error_handler.py new file mode 100644 index 000000000..4e8d87fd7 --- /dev/null +++ b/karapace/kafka_error_handler.py @@ -0,0 +1,47 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.config import Config +from karapace.errors import CorruptKafkaRecordException +from karapace.typing import StrEnum + +import aiokafka.errors as Errors +import enum +import logging + +LOG = logging.getLogger(__name__) + + +class KafkaErrorLocation(StrEnum): + SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR" + SCHEMA_READER = "SCHEMA_READER" + + +class KafkaRetriableErrors(enum.Enum): + SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,) + + +class KafkaErrorHandler: + def __init__(self, config: Config) -> None: + self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"] + self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"] + + def log(self, location: KafkaErrorLocation, error: BaseException) -> None: + LOG.warning("%s encountered error - %s", location, error) + + def handle_schema_coordinator_error(self, error: BaseException) -> None: + if getattr(error, "retriable", False) or ( + error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value + ): + self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error) + if not self.retriable_errors_silenced: + raise error + + def handle_schema_reader_error(self, error: BaseException) -> None: + if self.schema_reader_strict_mode: + raise CorruptKafkaRecordException from error + + def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None: + return getattr(self, f"handle_{location.lower()}_error")(error=error) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index e3884aac5..9ded95651 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,11 +28,12 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import CorruptKafkaRecordException, InvalidReferences, InvalidSchema, ShutdownException +from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.exception import ProtobufException @@ -141,6 +142,7 @@ def __init__( self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) + self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) # Thread synchronization objects # - offset is used by the REST API to wait until this thread has @@ -185,7 +187,8 @@ def run(self) -> None: LOG.warning("[Admin Client] No Brokers available yet. Retrying") self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Admin Client] Invalid configuration. Bailing") + LOG.warning("[Admin Client] Invalid configuration. Bailing") + self._stop_schema_reader.set() raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Admin Client] Unexpected exception. Retrying") @@ -202,9 +205,9 @@ def run(self) -> None: LOG.warning("[Consumer] No Brokers available yet. Retrying") self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Consumer] Invalid configuration. Bailing") + LOG.warning("[Consumer] Invalid configuration. Bailing") self._stop_schema_reader.set() - shutdown() + raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Consumer] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="consumer_instantiation") @@ -249,7 +252,7 @@ def run(self) -> None: shutdown() except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - LOG.exception("Unexpected exception in schema reader loop") + LOG.warning("Unexpected exception in schema reader loop - %s", e) def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -359,7 +362,9 @@ def handle_messages(self) -> None: key = json_decode(message_key) except JSONDecodeError as exc: LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) - raise CorruptKafkaRecordException from exc + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: LOG.error( "Kafka authorization error when consuming from %s: %s %s", @@ -367,7 +372,9 @@ def handle_messages(self) -> None: exc, msg.error(), ) - raise ShutdownException from exc + if self.kafka_error_handler.schema_reader_strict_mode: + raise ShutdownException from exc + continue assert isinstance(key, dict) msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE @@ -386,13 +393,17 @@ def handle_messages(self) -> None: value = self._parse_message_value(message_value) except (JSONDecodeError, TypeError) as exc: LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset()) - raise CorruptKafkaRecordException from exc + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] try: self.handle_msg(key, value) - self.offset = msg.offset() except (InvalidSchema, TypeError) as exc: - raise CorruptKafkaRecordException from exc + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue + finally: + self.offset = msg.offset() if msg_keymode == KeyMode.CANONICAL: schema_records_processed_keymode_canonical += 1 diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index e78c07f6a..08076dbde 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,7 +4,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiohttp.client_exceptions import ClientError from aiokafka.errors import InvalidTopicError from karapace.backup import api from karapace.backup.api import BackupVersion @@ -237,14 +236,9 @@ async def test_backup_restore( topic_name=api.normalize_topic_name(None, config), ) time.sleep(1.0) - - # Restoring a `v1` backup with an invalid schema stops the service as expected, but I am - # unsure why the logic mismatch, needs further investigation. - if backup_file_version == "v1": - with pytest.raises(ClientError): - await registry_async_client.get(f"subjects/{subject}/versions") - else: - await registry_async_client.get(f"subjects/{subject}/versions") + res = await registry_async_client.get(f"subjects/{subject}/versions") + assert res.status_code == 200 + assert res.json() == [1] _assert_canonical_key_format( bootstrap_servers=kafka_servers.bootstrap_servers, schemas_topic=registry_cluster.schemas_topic diff --git a/tests/unit/test_kafka_error_handler.py b/tests/unit/test_kafka_error_handler.py new file mode 100644 index 000000000..45e9fea1b --- /dev/null +++ b/tests/unit/test_kafka_error_handler.py @@ -0,0 +1,78 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from _pytest.logging import LogCaptureFixture +from karapace.errors import CorruptKafkaRecordException +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation + +import aiokafka.errors as Errors +import logging +import pytest + + +@pytest.fixture(name="kafka_error_handler") +def fixture_kafka_error_handler() -> KafkaErrorHandler: + config = { + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, + } + return KafkaErrorHandler(config=config) + + +@pytest.mark.parametrize( + "retriable_error", + [ + Errors.NodeNotReadyError("node is still starting"), + Errors.GroupCoordinatorNotAvailableError("group is unavailable"), + Errors.NoBrokersAvailable("no brokers available"), + ], +) +def test_handle_error_retriable_schema_coordinator( + caplog: LogCaptureFixture, + kafka_error_handler: KafkaErrorHandler, + retriable_error: Errors.KafkaError, +): + kafka_error_handler.retriable_errors_silenced = True + with caplog.at_level(logging.WARNING, logger="karapace.error_handler"): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + for log in caplog.records: + assert log.name == "karapace.kafka_error_handler" + assert log.levelname == "WARNING" + assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}" + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(retriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + +@pytest.mark.parametrize( + "nonretriable_error", + [ + ValueError("value missing"), + Errors.GroupAuthorizationFailedError("authorization failed"), + Errors.InvalidCommitOffsetSizeError("invalid commit size"), + ], +) +def test_handle_error_nonretriable_schema_coordinator( + kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException +) -> None: + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(nonretriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = True + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + +def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None: + kafka_error_handler.schema_reader_strict_mode = True + with pytest.raises(CorruptKafkaRecordException): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) + + # Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour + kafka_error_handler.schema_reader_strict_mode = False + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 4dbc728af..afbcbb976 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,7 +10,6 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS -from karapace.errors import CorruptKafkaRecordException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -200,15 +199,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche consumer_mock = Mock(spec=KafkaConsumer) schema_str = json.dumps( - { - "subject": "test", - "version": 1, - "id": 1, - "deleted": False, - "schema": json.dumps( - {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} - ), - } + {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} ).encode() ok1_message = Mock(spec=Message) @@ -246,16 +237,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 assert schema_reader.ready is False - - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() - assert schema_reader.offset == 1 - assert schema_reader.ready is False - - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() - assert schema_reader.offset == 1 - assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 2 + assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 3 + assert schema_reader.ready is False + schema_reader.handle_messages() # call last time to call _is_ready() + assert schema_reader.offset == 3 + assert schema_reader.ready is True + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP def test_soft_deleted_schema_storing() -> None: