Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test datalake with recovery mode and disabled partitions #24446

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2402,7 +2402,8 @@ bool application::wasm_data_transforms_enabled() {
}

bool application::datalake_enabled() {
return config::shard_local_cfg().iceberg_enabled();
return config::shard_local_cfg().iceberg_enabled()
&& !config::node().recovery_mode_enabled();
}

ss::future<>
Expand Down Expand Up @@ -3143,11 +3144,13 @@ void application::start_runtime_services(
smp_service_groups.cluster_smp_sg(),
std::ref(controller->get_data_migration_frontend()),
std::ref(controller->get_data_migration_irpc_frontend())));
runtime_services.push_back(
std::make_unique<datalake::coordinator::rpc::service>(
sched_groups.datalake_sg(),
smp_service_groups.datalake_sg(),
&_datalake_coordinator_fe));
if (datalake_enabled()) {
runtime_services.push_back(
std::make_unique<datalake::coordinator::rpc::service>(
sched_groups.datalake_sg(),
smp_service_groups.datalake_sg(),
&_datalake_coordinator_fe));
}

s.add_services(std::move(runtime_services));

Expand Down
125 changes: 125 additions & 0 deletions tests/rptest/tests/datalake/recovery_mode_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import time
import random

from ducktape.mark import matrix
from ducktape.utils.util import wait_until

from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import SISettings
from rptest.services.admin import Admin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types


class DatalakeRecoveryModeTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super(DatalakeRecoveryModeTest,
self).__init__(test_ctx,
num_brokers=3,
si_settings=SISettings(test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
*args,
**kwargs)

def setUp(self):
# redpanda will be started by DatalakeServices
pass

@cluster(num_nodes=6)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
def test_recovery_mode(self, cloud_storage_type, filesystem_catalog_mode):
with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[QueryEngineType.SPARK
]) as dl:
count = 1000
rpk = RpkTool(self.redpanda)

dl.create_iceberg_enabled_topic("foo", partitions=10)
rpk.create_topic("bar", partitions=10, replicas=3)

dl.produce_to_topic("foo", 1024, count)

# test partial recovery mode
self.redpanda.restart_nodes(
random.sample(self.redpanda.nodes, 1),
override_cfg_params={"recovery_mode_enabled": True})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe produce to the topic to make it more likely that there's more data that would be scheduled if not for recovery on the given node? And maybe again to "foo" and "bar" after enabling recovery mode for the cluster? At that point, maybe we could also check that the table doesn't grow before and after the sleep

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in the next test -- or does recovery mode prevent us from producing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does recovery mode prevent us from producing?

Yes exactly. I thought about adding some checks that the table doesn't grow in recovery mode, but they would be either trivial or inherently flaky.

time.sleep(15)

self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": True})
self.redpanda.wait_for_membership(first_start=False)

admin = Admin(self.redpanda)
admin.await_stable_leader(namespace="redpanda", topic="controller")

rpk.alter_topic_config("bar", TopicSpec.PROPERTY_ICEBERG_MODE,
"key_value")
time.sleep(15)

self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": False})
self.redpanda.wait_for_membership(first_start=False)

dl.produce_to_topic("foo", 1024, count)
dl.produce_to_topic("bar", 1024, count)

dl.wait_for_translation("foo", msg_count=2 * count)
dl.wait_for_translation("bar", msg_count=count)

@cluster(num_nodes=6)
@matrix(cloud_storage_type=supported_storage_types(),
filesystem_catalog_mode=[True, False])
def test_disabled_partitions(self, cloud_storage_type,
filesystem_catalog_mode):
with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[QueryEngineType.SPARK
]) as dl:

count = 1000
rpk = RpkTool(self.redpanda)

dl.create_iceberg_enabled_topic("foo", partitions=10)
rpk.create_topic("bar", partitions=10, replicas=3)

dl.produce_to_topic("foo", 1024, count)

admin = Admin(self.redpanda)
admin.set_partitions_disabled(ns="kafka", topic="foo")
admin.set_partitions_disabled(ns="kafka", topic="bar")

rpk.alter_topic_config("bar", TopicSpec.PROPERTY_ICEBERG_MODE,
"key_value")

time.sleep(15)
admin.set_partitions_disabled(ns="kafka", topic="foo", value=False)
admin.set_partitions_disabled(ns="kafka", topic="bar", value=False)

dl.produce_to_topic("foo", 1024, count)
dl.produce_to_topic("bar", 1024, count)

dl.wait_for_translation("foo", msg_count=2 * count)
dl.wait_for_translation("bar", msg_count=count)
Loading