diff --git a/karapace/protobuf/exception.py b/karapace/protobuf/exception.py index f37686256..58569bac9 100644 --- a/karapace/protobuf/exception.py +++ b/karapace/protobuf/exception.py @@ -12,14 +12,6 @@ from karapace.protobuf.schema import ProtobufSchema -class IllegalStateException(Exception): - pass - - -class IllegalArgumentException(Exception): - pass - - class Error(Exception): """Base class for errors in this module.""" @@ -28,10 +20,18 @@ class ProtobufException(Error): """Generic Protobuf schema error.""" -class ProtobufTypeException(Error): +class ProtobufTypeException(ProtobufException): """Generic Protobuf type error.""" +class IllegalStateException(ProtobufException): + pass + + +class IllegalArgumentException(ProtobufException): + pass + + class ProtobufUnresolvedDependencyException(ProtobufException): """a Protobuf schema has unresolved dependency""" diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f36b85f67..5bd0f6d5d 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -472,7 +472,7 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None: def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument if value is None: LOG.warning("DELETE_SUBJECT record doesnt have a value, should have") - return + raise ValueError("DELETE_SUBJECT record doesnt have a value, should have") subject = value["subject"] version = Version(value["version"]) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index afbcbb976..9d7bd9342 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -6,6 +6,7 @@ """ from _pytest.logging import LogCaptureFixture +from aiokafka.errors import KafkaError from concurrent.futures import ThreadPoolExecutor from confluent_kafka import Message from dataclasses import dataclass @@ -18,11 +19,15 @@ KafkaSchemaReader, MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, MAX_MESSAGES_TO_CONSUME_ON_STARTUP, + MessageType, OFFSET_EMPTY, OFFSET_UNINITIALIZED, ) +from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from tests.base_testcase import BaseTestCase +from tests.utils import schema_protobuf_invalid +from typing import Callable from unittest.mock import Mock import confluent_kafka @@ -318,3 +323,191 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have" + + +@dataclass +class KafkaMessageHandlingErrorTestCase(BaseTestCase): + key: bytes + value: bytes + schema_type: SchemaType | None + message_type: MessageType | None + expected_error: ShutdownException + expected_log_message: str + + +@pytest.fixture(name="schema_reader_with_consumer_messages_factory") +def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: + def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: + key_formatter_mock = Mock(spec=KeyFormatter) + consumer_mock = Mock(spec=KafkaConsumer) + + consumer_mock.consume.side_effect = consumer_messages + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 4) + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + return schema_reader + + return factory + + +@pytest.fixture(name="message_factory") +def fixture_message_factory() -> Callable[[bytes, bytes, int, KafkaError], Message]: + def factory(key: bytes, value: bytes, offset: int = 1, error: KafkaError | None = None) -> Message: + message = Mock(spec=Message) + message.key.return_value = key + message.value.return_value = value + message.offset.return_value = offset + message.error.return_value = error + return message + + return factory + + +@pytest.mark.parametrize( + "test_case", + [ + KafkaMessageHandlingErrorTestCase( + test_name="Message key is not valid JSON", + key=b'{subject1::::"test""version":1"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.key() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is missing from message key", + key=b'{"subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because doesn't contain the `keytype` key in the key" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Keytype is invalid on message key", + key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}', + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=None, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'value': 'value does not matter at this stage, just correct JSON'} " + "has been discarded because the NOT_A_VALID_KEY_TYPE is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid JSON", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'), + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Config message value is not valid config setting", + key=b'{"keytype":"CONFIG","subject":null,"magic":0}', + value=b'{"not_the_key_name":"INVALID_CONFIG"}', + schema_type=None, + message_type=MessageType.config, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-" + "{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Version in schema message value is not valid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"subject": "test", "version": "invalid-version", "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': " + '\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} ' + "has been discarded because the SCHEMA is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Message value is not valid JSON", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'no-valid-json"version": 1, "id": 1, "deleted": false,' + b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": ' + b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}' + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Invalid JSON in msg.value() at offset 1", + ), + KafkaMessageHandlingErrorTestCase( + test_name="Delete subject message value is missing `subject` field", + key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}', + value=b'{"not-subject-key":"test","version":1}', + schema_type=None, + message_type=MessageType.delete_subject, + expected_error=CorruptKafkaRecordException, + expected_log_message=( + "The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-" + "{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed" + ), + ), + KafkaMessageHandlingErrorTestCase( + test_name="Protobuf schema is invalid", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":' + + json.dumps(schema_protobuf_invalid).encode() + + b"}" + ), + schema_type=SchemaType.PROTOBUF, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Schema is not valid ProtoBuf definition", + ), + ], +) +def test_message_error_handling( + caplog: LogCaptureFixture, + test_case: KafkaMessageHandlingErrorTestCase, + schema_reader_with_consumer_messages_factory: Callable[[tuple[list[Message]]], KafkaSchemaReader], + message_factory: Callable[[bytes, bytes, int, KafkaError], Message], +) -> None: + message = message_factory(key=test_case.key, value=test_case.value) + consumer_messages = ([message],) + schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) + + with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): + with pytest.raises(test_case.expected_error): + schema_reader.handle_messages() + + assert schema_reader.offset == 0 + assert not schema_reader.ready + for log in caplog.records: + assert log.name == "karapace.schema_reader" + assert log.levelname == "WARNING" + assert log.message == test_case.expected_log_message diff --git a/tests/utils.py b/tests/utils.py index f38097858..352745f94 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -148,6 +148,22 @@ {"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}}, ] +schema_protobuf_invalid = """ +|o3" +| +|opti -- om.codingharbour.protobuf"; +|option java_outer_classname = "TestEnumOrder"; +| +|message Message { +| int32 +| speed =; +|} +|Enum +| HIGH = 0 +| MIDDLE = ; +""" +schema_protobuf_invalid = trim_margin(schema_protobuf_invalid) + schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)} second_schema_json = json.dumps(