Skip to content

Commit

Permalink
Merge pull request #24446 from ztlpn/iceberg-test-recovery-mode
Browse files Browse the repository at this point in the history
Test datalake with recovery mode and disabled partitions
  • Loading branch information
ztlpn authored Dec 11, 2024
2 parents 2ad9e2b + 80d3ac2 commit 942f964
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2402,7 +2402,8 @@ bool application::wasm_data_transforms_enabled() {
}

bool application::datalake_enabled() {
return config::shard_local_cfg().iceberg_enabled();
return config::shard_local_cfg().iceberg_enabled()
&& !config::node().recovery_mode_enabled();
}

ss::future<>
Expand Down Expand Up @@ -3143,11 +3144,13 @@ void application::start_runtime_services(
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_data_migration_frontend()),
std::ref(controller->get_data_migration_irpc_frontend())));
runtime_services.push_back(
std::make_unique<datalake::coordinator::rpc::service>(
sched_groups.datalake_sg(),
smp_service_groups.datalake_sg(),
&_datalake_coordinator_fe));
if (datalake_enabled()) {
runtime_services.push_back(
std::make_unique<datalake::coordinator::rpc::service>(
sched_groups.datalake_sg(),
smp_service_groups.datalake_sg(),
&_datalake_coordinator_fe));
}

s.add_services(std::move(runtime_services));

Expand Down
125 changes: 125 additions & 0 deletions tests/rptest/tests/datalake/recovery_mode_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import time
import random

from ducktape.mark import matrix
from ducktape.utils.util import wait_until

from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import SISettings
from rptest.services.admin import Admin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types


class DatalakeRecoveryModeTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super(DatalakeRecoveryModeTest,
self).__init__(test_ctx,
num_brokers=3,
si_settings=SISettings(test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
*args,
**kwargs)

def setUp(self):
# redpanda will be started by DatalakeServices
pass

@cluster(num_nodes=6)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
def test_recovery_mode(self, cloud_storage_type, filesystem_catalog_mode):
with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
count = 1000
rpk = RpkTool(self.redpanda)

dl.create_iceberg_enabled_topic("foo", partitions=10)
rpk.create_topic("bar", partitions=10, replicas=3)

dl.produce_to_topic("foo", 1024, count)

# test partial recovery mode
self.redpanda.restart_nodes(
random.sample(self.redpanda.nodes, 1),
override_cfg_params={"recovery_mode_enabled": True})

time.sleep(15)

self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": True})
self.redpanda.wait_for_membership(first_start=False)

admin = Admin(self.redpanda)
admin.await_stable_leader(namespace="redpanda", topic="controller")

rpk.alter_topic_config("bar", TopicSpec.PROPERTY_ICEBERG_MODE,
"key_value")
time.sleep(15)

self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": False})
self.redpanda.wait_for_membership(first_start=False)

dl.produce_to_topic("foo", 1024, count)
dl.produce_to_topic("bar", 1024, count)

dl.wait_for_translation("foo", msg_count=2 * count)
dl.wait_for_translation("bar", msg_count=count)

@cluster(num_nodes=6)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
def test_disabled_partitions(self, cloud_storage_type,
filesystem_catalog_mode):
with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[QueryEngineType.SPARK
]) as dl:

count = 1000
rpk = RpkTool(self.redpanda)

dl.create_iceberg_enabled_topic("foo", partitions=10)
rpk.create_topic("bar", partitions=10, replicas=3)

dl.produce_to_topic("foo", 1024, count)

admin = Admin(self.redpanda)
admin.set_partitions_disabled(ns="kafka", topic="foo")
admin.set_partitions_disabled(ns="kafka", topic="bar")

rpk.alter_topic_config("bar", TopicSpec.PROPERTY_ICEBERG_MODE,
"key_value")

time.sleep(15)
admin.set_partitions_disabled(ns="kafka", topic="foo", value=False)
admin.set_partitions_disabled(ns="kafka", topic="bar", value=False)

dl.produce_to_topic("foo", 1024, count)
dl.produce_to_topic("bar", 1024, count)

dl.wait_for_translation("foo", msg_count=2 * count)
dl.wait_for_translation("bar", msg_count=count)

0 comments on commit 942f964

Please sign in to comment.