Skip to content

Commit

Permalink
feat: enable the customization of the kafka properties
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Jul 30, 2024
1 parent 8c50eb0 commit 71ff3bb
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 10 deletions.
1 change: 1 addition & 0 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
session_timeout_ms=config["session_timeout_ms"],
metadata_max_age_ms=config["metadata_max_age_ms"],
)
try:
Expand Down
109 changes: 109 additions & 0 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from aiokafka.errors import NoBrokersAvailable
from confluent_kafka.admin import NewTopic
from karapace.backup.api import BackupVersion, create_backup
from karapace.config import Config, DEFAULTS, set_config_defaults
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_utils import kafka_producer_from_config
from pathlib import Path
from tests.integration.conftest import create_kafka_server
from tests.integration.utils.config import KafkaDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive

import pytest

SESSION_TIMEOUT_MS = 65000
GROUP_MIN_SESSION_TIMEOUT_MS = 60000
GROUP_MAX_SESSION_TIMEOUT_MS = 70000


# use a dedicated kafka server with specific values for
# group.min.session.timeout.ms and group.max.session.timeout.ms
@pytest.fixture(scope="function", name="kafka_server_session_timeout")
def fixture_kafka_server(
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
tmp_path_factory: pytest.TempPathFactory,
):
# use custom data and log dir to avoid conflict with other kafka servers
session_datadir = tmp_path_factory.mktemp("kafka_server_min_data")
session_logdir = tmp_path_factory.mktemp("kafka_server_min_log")
kafka_config_extra = {
"group.min.session.timeout.ms": GROUP_MIN_SESSION_TIMEOUT_MS,
"group.max.session.timeout.ms": GROUP_MAX_SESSION_TIMEOUT_MS,
}
yield from create_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
kafka_config_extra,
)


def test_producer_with_custom_kafka_properties_does_not_fail(
kafka_server_session_timeout: KafkaServers,
new_topic: NewTopic,
tmp_path: Path,
) -> None:
"""
This test checks wether the custom properties are accepted by kafka.
We know by the implementation of the consumer startup code that if
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
will raise an exception during the startup.
This test ensures that the `session.timeout.ms` can be injected in
the kafka config so that the exception isn't raised
"""
config = set_config_defaults(
Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS)
)

admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)

with kafka_producer_from_config(config) as producer:
producer.send(
new_topic.topic,
key=b"foo",
value=b"bar",
partition=0,
headers=[
("some-header", b"some header value"),
("other-header", b"some other header value"),
],
timestamp=1683474657,
)
producer.flush()

# without performing the backup the exception isn't raised.
create_backup(
config=config,
backup_location=tmp_path / "backup",
topic_name=new_topic.topic,
version=BackupVersion.V3,
replication_factor=1,
)


def test_producer_with_custom_kafka_properties_fail(
kafka_server_session_timeout: KafkaServers,
new_topic: NewTopic,
) -> None:
"""
This test checks wether the custom properties are accepted by kafka.
We know by the implementation of the consumer startup code that if
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
will raise an exception during the startup.
This test ensures that the `session.timeout.ms` can be injected in
the kafka config so that the exception isn't raised
"""
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)

with pytest.raises(NoBrokersAvailable):
with kafka_producer_from_config(DEFAULTS) as producer:
_ = producer
33 changes: 27 additions & 6 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from _pytest.fixtures import SubRequest
from aiohttp.pytest_plugin import AiohttpClient
from aiohttp.test_utils import TestClient
Expand Down Expand Up @@ -32,7 +34,7 @@
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from typing import AsyncIterator, Iterator
from urllib.parse import urlparse

import asyncio
Expand Down Expand Up @@ -120,6 +122,24 @@ def fixture_kafka_server(
yield kafka_servers
return

yield from create_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
)


def create_kafka_server(
session_datadir: Path,
session_logdir: Path,
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
kafka_properties: dict[str, int | str] | None = None,
) -> Iterator[KafkaServers]:
if kafka_properties is None:
kafka_properties = {}

zk_dir = session_logdir / "zk"

# File used to share data among test runners, including the dynamic
Expand Down Expand Up @@ -170,6 +190,7 @@ def fixture_kafka_server(
kafka_config=kafka_config,
kafka_description=kafka_description,
log4j_config=KAFKA_LOG4J,
kafka_properties=kafka_properties,
)
stack.callback(stop_process, kafka_proc)

Expand Down Expand Up @@ -269,7 +290,7 @@ async def fixture_rest_async(
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -342,7 +363,7 @@ async def fixture_rest_async_novalidation(
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -415,7 +436,7 @@ async def fixture_rest_async_registry_auth(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
kafka_servers: KafkaServers,
registry_async_client_auth: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
) -> AsyncIterator[KafkaRest | None]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
Expand Down Expand Up @@ -486,7 +507,7 @@ async def fixture_registry_async_pair(
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[List[str]]:
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers and returns their URL endpoints."""

config1: Config = {"bootstrap_uri": kafka_servers.bootstrap_servers}
Expand Down Expand Up @@ -701,7 +722,7 @@ async def fixture_registry_async_auth_pair(
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[List[str]]:
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints."""

config1: Config = {
Expand Down
11 changes: 7 additions & 4 deletions tests/integration/utils/kafka_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiokafka.errors import AuthenticationFailedError, NoBrokersAvailable
from dataclasses import dataclass
from karapace.kafka.admin import KafkaAdminClient
Expand All @@ -11,7 +13,6 @@
from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig
from tests.integration.utils.process import get_java_process_configuration
from tests.utils import write_ini
from typing import Dict, List

import logging
import os
Expand All @@ -24,7 +25,7 @@

@dataclass
class KafkaServers:
bootstrap_servers: List[str]
bootstrap_servers: list[str]

def __post_init__(self) -> None:
is_bootstrap_uris_valid = (
Expand Down Expand Up @@ -100,7 +101,7 @@ def kafka_java_args(
logs_dir: str,
log4j_properties_path: str,
kafka_description: KafkaDescription,
) -> List[str]:
) -> list[str]:
msg = f"Couldn't find kafka installation at {kafka_description.install_dir} to run integration tests."
assert kafka_description.install_dir.exists(), msg
java_args = [
Expand All @@ -121,6 +122,7 @@ def configure_and_start_kafka(
kafka_config: KafkaConfig,
kafka_description: KafkaDescription,
log4j_config: str,
kafka_properties: dict[str, str | int],
) -> Popen:
config_path = Path(kafka_config.logdir) / "server.properties"

Expand Down Expand Up @@ -167,6 +169,7 @@ def configure_and_start_kafka(
"zookeeper.connection.timeout.ms": 6000,
"zookeeper.connect": f"127.0.0.1:{zk_config.client_port}",
}
kafka_ini.update(kafka_properties)

write_ini(config_path, kafka_ini)

Expand All @@ -179,6 +182,6 @@ def configure_and_start_kafka(
kafka_description=kafka_description,
),
)
env: Dict[bytes, bytes] = {}
env: dict[bytes, bytes] = {}
proc = Popen(kafka_cmd, env=env) # pylint: disable=consider-using-with
return proc

0 comments on commit 71ff3bb

Please sign in to comment.