diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index 4b92d8ec6..129ad96d4 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -40,6 +40,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_username=config["sasl_plain_username"], sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", + session_timeout_ms=config["session_timeout_ms"], metadata_max_age_ms=config["metadata_max_age_ms"], ) try: diff --git a/tests/integration/backup/test_session_timeout.py b/tests/integration/backup/test_session_timeout.py new file mode 100644 index 000000000..b585e759a --- /dev/null +++ b/tests/integration/backup/test_session_timeout.py @@ -0,0 +1,109 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from aiokafka.errors import NoBrokersAvailable +from confluent_kafka.admin import NewTopic +from karapace.backup.api import BackupVersion, create_backup +from karapace.config import Config, DEFAULTS, set_config_defaults +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka_utils import kafka_producer_from_config +from pathlib import Path +from tests.integration.conftest import create_kafka_server +from tests.integration.utils.config import KafkaDescription +from tests.integration.utils.kafka_server import KafkaServers +from tests.integration.utils.network import PortRangeInclusive + +import pytest + +SESSION_TIMEOUT_MS = 65000 +GROUP_MIN_SESSION_TIMEOUT_MS = 60000 +GROUP_MAX_SESSION_TIMEOUT_MS = 70000 + + +# use a dedicated kafka server with specific values for +# group.min.session.timeout.ms and group.max.session.timeout.ms +@pytest.fixture(scope="function", name="kafka_server_session_timeout") +def fixture_kafka_server( + kafka_description: KafkaDescription, + port_range: PortRangeInclusive, + tmp_path_factory: pytest.TempPathFactory, +): + # use custom data and log dir to avoid conflict with other kafka servers + session_datadir = tmp_path_factory.mktemp("kafka_server_min_data") + session_logdir = tmp_path_factory.mktemp("kafka_server_min_log") + kafka_config_extra = { + "group.min.session.timeout.ms": GROUP_MIN_SESSION_TIMEOUT_MS, + "group.max.session.timeout.ms": GROUP_MAX_SESSION_TIMEOUT_MS, + } + yield from create_kafka_server( + session_datadir, + session_logdir, + kafka_description, + port_range, + kafka_config_extra, + ) + + +def test_producer_with_custom_kafka_properties_does_not_fail( + kafka_server_session_timeout: KafkaServers, + new_topic: NewTopic, + tmp_path: Path, +) -> None: + """ + This test checks wether the custom properties are accepted by kafka. + We know by the implementation of the consumer startup code that if + `group.session.min.timeout.ms` > `session.timeout.ms` the consumer + will raise an exception during the startup. + This test ensures that the `session.timeout.ms` can be injected in + the kafka config so that the exception isn't raised + """ + config = set_config_defaults( + Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS) + ) + + admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) + admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) + + with kafka_producer_from_config(config) as producer: + producer.send( + new_topic.topic, + key=b"foo", + value=b"bar", + partition=0, + headers=[ + ("some-header", b"some header value"), + ("other-header", b"some other header value"), + ], + timestamp=1683474657, + ) + producer.flush() + + # without performing the backup the exception isn't raised. + create_backup( + config=config, + backup_location=tmp_path / "backup", + topic_name=new_topic.topic, + version=BackupVersion.V3, + replication_factor=1, + ) + + +def test_producer_with_custom_kafka_properties_fail( + kafka_server_session_timeout: KafkaServers, + new_topic: NewTopic, +) -> None: + """ + This test checks wether the custom properties are accepted by kafka. + We know by the implementation of the consumer startup code that if + `group.session.min.timeout.ms` > `session.timeout.ms` the consumer + will raise an exception during the startup. + This test ensures that the `session.timeout.ms` can be injected in + the kafka config so that the exception isn't raised + """ + admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers) + admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1) + + with pytest.raises(NoBrokersAvailable): + with kafka_producer_from_config(DEFAULTS) as producer: + _ = producer diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6ed4f4b22..cc86a3c36 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,6 +4,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient @@ -32,7 +34,7 @@ from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator, List, Optional +from typing import AsyncIterator, Iterator from urllib.parse import urlparse import asyncio @@ -120,6 +122,24 @@ def fixture_kafka_server( yield kafka_servers return + yield from create_kafka_server( + session_datadir, + session_logdir, + kafka_description, + port_range, + ) + + +def create_kafka_server( + session_datadir: Path, + session_logdir: Path, + kafka_description: KafkaDescription, + port_range: PortRangeInclusive, + kafka_properties: dict[str, int | str] | None = None, +) -> Iterator[KafkaServers]: + if kafka_properties is None: + kafka_properties = {} + zk_dir = session_logdir / "zk" # File used to share data among test runners, including the dynamic @@ -170,6 +190,7 @@ def fixture_kafka_server( kafka_config=kafka_config, kafka_description=kafka_description, log4j_config=KAFKA_LOG4J, + kafka_properties=kafka_properties, ) stack.callback(stop_process, kafka_proc) @@ -269,7 +290,7 @@ async def fixture_rest_async( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -342,7 +363,7 @@ async def fixture_rest_async_novalidation( tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -415,7 +436,7 @@ async def fixture_rest_async_registry_auth( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument kafka_servers: KafkaServers, registry_async_client_auth: Client, -) -> AsyncIterator[Optional[KafkaRest]]: +) -> AsyncIterator[KafkaRest | None]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in # the election process. Without proper configuration for the listeners that @@ -486,7 +507,7 @@ async def fixture_registry_async_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers} @@ -701,7 +722,7 @@ async def fixture_registry_async_auth_pair( session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, -) -> AsyncIterator[List[str]]: +) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints.""" config1: Config = { diff --git a/tests/integration/utils/kafka_server.py b/tests/integration/utils/kafka_server.py index fb6fff7e4..520315b73 100644 --- a/tests/integration/utils/kafka_server.py +++ b/tests/integration/utils/kafka_server.py @@ -2,6 +2,8 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from aiokafka.errors import AuthenticationFailedError, NoBrokersAvailable from dataclasses import dataclass from karapace.kafka.admin import KafkaAdminClient @@ -11,7 +13,6 @@ from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig from tests.integration.utils.process import get_java_process_configuration from tests.utils import write_ini -from typing import Dict, List import logging import os @@ -24,7 +25,7 @@ @dataclass class KafkaServers: - bootstrap_servers: List[str] + bootstrap_servers: list[str] def __post_init__(self) -> None: is_bootstrap_uris_valid = ( @@ -100,7 +101,7 @@ def kafka_java_args( logs_dir: str, log4j_properties_path: str, kafka_description: KafkaDescription, -) -> List[str]: +) -> list[str]: msg = f"Couldn't find kafka installation at {kafka_description.install_dir} to run integration tests." assert kafka_description.install_dir.exists(), msg java_args = [ @@ -121,6 +122,7 @@ def configure_and_start_kafka( kafka_config: KafkaConfig, kafka_description: KafkaDescription, log4j_config: str, + kafka_properties: dict[str, str | int], ) -> Popen: config_path = Path(kafka_config.logdir) / "server.properties" @@ -167,6 +169,7 @@ def configure_and_start_kafka( "zookeeper.connection.timeout.ms": 6000, "zookeeper.connect": f"127.0.0.1:{zk_config.client_port}", } + kafka_ini.update(kafka_properties) write_ini(config_path, kafka_ini) @@ -179,6 +182,6 @@ def configure_and_start_kafka( kafka_description=kafka_description, ), ) - env: Dict[bytes, bytes] = {} + env: dict[bytes, bytes] = {} proc = Popen(kafka_cmd, env=env) # pylint: disable=consider-using-with return proc