Skip to content

Commit

Permalink
schema-reader: Wrap shutdown feature around flag
Browse files Browse the repository at this point in the history
We'd like to allow the shutdown logic on corrutp schema be guarded by a feature
flag so we do not surprise customers, this is disabled by default.
  • Loading branch information
nosahama committed Aug 30, 2024
1 parent 1cdd0f1 commit 2e716a4
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 40 deletions.
4 changes: 4 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ services:
KARAPACE_COMPATIBILITY: FULL
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

karapace-rest:
image: ghcr.io/aiven-open/karapace:develop
Expand All @@ -106,6 +108,8 @@ services:
KARAPACE_LOG_LEVEL: WARNING
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

prometheus:
image: prom/prometheus
Expand Down
4 changes: 4 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class Config(TypedDict):
protobuf_runtime_directory: str
statsd_host: str
statsd_port: int
kafka_schema_reader_strict_mode: bool
kafka_retriable_errors_silenced: bool

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False):
"protobuf_runtime_directory": "runtime",
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
2 changes: 1 addition & 1 deletion karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def start(self) -> None:
await self._kafka_client.bootstrap()
break
except KafkaConnectionError:
LOG.exception("Kafka client bootstrap failed.")
LOG.warning("Kafka client bootstrap failed.")
await asyncio.sleep(0.5)

while not self._kafka_client.cluster.brokers():
Expand Down
47 changes: 47 additions & 0 deletions karapace/kafka_error_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from karapace.config import Config
from karapace.errors import CorruptKafkaRecordException
from karapace.typing import StrEnum

import aiokafka.errors as Errors
import enum
import logging

LOG = logging.getLogger(__name__)


class KafkaErrorLocation(StrEnum):
SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR"
SCHEMA_READER = "SCHEMA_READER"


class KafkaRetriableErrors(enum.Enum):
SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,)


class KafkaErrorHandler:
def __init__(self, config: Config) -> None:
self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"]
self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"]

def log(self, location: KafkaErrorLocation, error: BaseException) -> None:
LOG.warning("%s encountered error - %s", location, error)

def handle_schema_coordinator_error(self, error: BaseException) -> None:
if getattr(error, "retriable", False) or (
error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value
):
self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error)
if not self.retriable_errors_silenced:
raise error

def handle_schema_reader_error(self, error: BaseException) -> None:
if self.schema_reader_strict_mode:
raise CorruptKafkaRecordException from error

def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None:
return getattr(self, f"handle_{location.lower()}_error")(error=error)
31 changes: 21 additions & 10 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import CorruptKafkaRecordException, InvalidReferences, InvalidSchema, ShutdownException
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.exception import ProtobufException
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config)

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down Expand Up @@ -185,7 +187,8 @@ def run(self) -> None:
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.info("[Admin Client] Invalid configuration. Bailing")
LOG.warning("[Admin Client] Invalid configuration. Bailing")
self._stop_schema_reader.set()
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Admin Client] Unexpected exception. Retrying")
Expand All @@ -202,9 +205,9 @@ def run(self) -> None:
LOG.warning("[Consumer] No Brokers available yet. Retrying")
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
except KafkaConfigurationError:
LOG.info("[Consumer] Invalid configuration. Bailing")
LOG.warning("[Consumer] Invalid configuration. Bailing")
self._stop_schema_reader.set()
shutdown()
raise
except Exception as e: # pylint: disable=broad-except
LOG.exception("[Consumer] Unexpected exception. Retrying")
self.stats.unexpected_exception(ex=e, where="consumer_instantiation")
Expand Down Expand Up @@ -249,7 +252,7 @@ def run(self) -> None:
shutdown()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")
LOG.warning("Unexpected exception in schema reader loop - %s", e)

def _get_beginning_offset(self) -> int:
assert self.consumer is not None, "Thread must be started"
Expand Down Expand Up @@ -359,15 +362,19 @@ def handle_messages(self) -> None:
key = json_decode(message_key)
except JSONDecodeError as exc:
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
raise CorruptKafkaRecordException from exc
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
LOG.error(
"Kafka authorization error when consuming from %s: %s %s",
self.config["topic_name"],
exc,
msg.error(),
)
raise ShutdownException from exc
if self.kafka_error_handler.schema_reader_strict_mode:
raise ShutdownException from exc
continue

assert isinstance(key, dict)
msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE
Expand All @@ -386,13 +393,17 @@ def handle_messages(self) -> None:
value = self._parse_message_value(message_value)
except (JSONDecodeError, TypeError) as exc:
LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset())
raise CorruptKafkaRecordException from exc
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]

try:
self.handle_msg(key, value)
self.offset = msg.offset()
except (InvalidSchema, TypeError) as exc:
raise CorruptKafkaRecordException from exc
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue
finally:
self.offset = msg.offset()

if msg_keymode == KeyMode.CANONICAL:
schema_records_processed_keymode_canonical += 1
Expand Down
12 changes: 3 additions & 9 deletions tests/integration/backup/test_legacy_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from aiohttp.client_exceptions import ClientError
from aiokafka.errors import InvalidTopicError
from karapace.backup import api
from karapace.backup.api import BackupVersion
Expand Down Expand Up @@ -237,14 +236,9 @@ async def test_backup_restore(
topic_name=api.normalize_topic_name(None, config),
)
time.sleep(1.0)

# Restoring a `v1` backup with an invalid schema stops the service as expected, but I am
# unsure why the logic mismatch, needs further investigation.
if backup_file_version == "v1":
with pytest.raises(ClientError):
await registry_async_client.get(f"subjects/{subject}/versions")
else:
await registry_async_client.get(f"subjects/{subject}/versions")
res = await registry_async_client.get(f"subjects/{subject}/versions")
assert res.status_code == 200
assert res.json() == [1]

_assert_canonical_key_format(
bootstrap_servers=kafka_servers.bootstrap_servers, schemas_topic=registry_cluster.schemas_topic
Expand Down
78 changes: 78 additions & 0 deletions tests/unit/test_kafka_error_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from _pytest.logging import LogCaptureFixture
from karapace.errors import CorruptKafkaRecordException
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation

import aiokafka.errors as Errors
import logging
import pytest


@pytest.fixture(name="kafka_error_handler")
def fixture_kafka_error_handler() -> KafkaErrorHandler:
config = {
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
}
return KafkaErrorHandler(config=config)


@pytest.mark.parametrize(
"retriable_error",
[
Errors.NodeNotReadyError("node is still starting"),
Errors.GroupCoordinatorNotAvailableError("group is unavailable"),
Errors.NoBrokersAvailable("no brokers available"),
],
)
def test_handle_error_retriable_schema_coordinator(
caplog: LogCaptureFixture,
kafka_error_handler: KafkaErrorHandler,
retriable_error: Errors.KafkaError,
):
kafka_error_handler.retriable_errors_silenced = True
with caplog.at_level(logging.WARNING, logger="karapace.error_handler"):
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error)

for log in caplog.records:
assert log.name == "karapace.kafka_error_handler"
assert log.levelname == "WARNING"
assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}"

# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour
kafka_error_handler.retriable_errors_silenced = False
with pytest.raises(retriable_error.__class__):
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error)


@pytest.mark.parametrize(
"nonretriable_error",
[
ValueError("value missing"),
Errors.GroupAuthorizationFailedError("authorization failed"),
Errors.InvalidCommitOffsetSizeError("invalid commit size"),
],
)
def test_handle_error_nonretriable_schema_coordinator(
kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException
) -> None:
kafka_error_handler.retriable_errors_silenced = False
with pytest.raises(nonretriable_error.__class__):
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error)

# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour
kafka_error_handler.retriable_errors_silenced = True
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error)


def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None:
kafka_error_handler.schema_reader_strict_mode = True
with pytest.raises(CorruptKafkaRecordException):
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception)

# Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour
kafka_error_handler.schema_reader_strict_mode = False
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception)
31 changes: 11 additions & 20 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.errors import CorruptKafkaRecordException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
Expand Down Expand Up @@ -200,15 +199,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
consumer_mock = Mock(spec=KafkaConsumer)

schema_str = json.dumps(
{
"subject": "test",
"version": 1,
"id": 1,
"deleted": False,
"schema": json.dumps(
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
),
}
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
).encode()

ok1_message = Mock(spec=Message)
Expand Down Expand Up @@ -246,16 +237,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
schema_reader.handle_messages()
assert schema_reader.offset == 1
assert schema_reader.ready is False

with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()
assert schema_reader.offset == 1
assert schema_reader.ready is False

with pytest.raises(CorruptKafkaRecordException):
schema_reader.handle_messages()
assert schema_reader.offset == 1
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 2
assert schema_reader.ready is False
schema_reader.handle_messages()
assert schema_reader.offset == 3
assert schema_reader.ready is False
schema_reader.handle_messages() # call last time to call _is_ready()
assert schema_reader.offset == 3
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP


def test_soft_deleted_schema_storing() -> None:
Expand Down

0 comments on commit 2e716a4

Please sign in to comment.