Skip to content

Commit

Permalink
tests: added tiered storage to node pool migration test
Browse files Browse the repository at this point in the history
Extended node pool migration test with tiered storage and different
cleanup policies.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 27, 2023
1 parent b0f0abd commit 3f3f79c
Showing 1 changed file with 75 additions and 16 deletions.
91 changes: 75 additions & 16 deletions tests/rptest/tests/node_pool_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from rptest.clients.types import TopicSpec
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.admin import Admin
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, make_redpanda_service
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, SISettings, make_redpanda_service
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode
from rptest.utils.node_operations import NodeDecommissionWaiter


Expand All @@ -39,9 +40,14 @@ class NodePoolMigrationTest(PreallocNodesTest):
def __init__(self, test_context):
self._topic = None

super(NodePoolMigrationTest, self).__init__(test_context=test_context,
num_brokers=10,
node_prealloc_count=1)
super(NodePoolMigrationTest, self).__init__(
test_context=test_context,
num_brokers=10,
node_prealloc_count=1,
si_settings=SISettings(test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True))

def setup(self):
# defer starting redpanda to test body
Expand Down Expand Up @@ -70,10 +76,18 @@ def _create_topics(self, replication_factors=[1, 3]):
for spec in topics:
self.client().create_topic(spec)

self._topic = random.choice(topics).name

return total_partitions

def _create_workload_topic(self, cleanup_policy):
spec = TopicSpec(name=f"migration-test-workload",
partition_count=8,
replication_factor=3,
cleanup_policy=cleanup_policy,
segment_bytes=self.segment_size)

self.client().create_topic(spec)
self._topic = spec.name

# after node was removed the state should be consistent on all other not removed nodes
def _check_state_consistent(self, decommissioned_id):

Expand Down Expand Up @@ -159,16 +173,25 @@ def decommissioned():

@property
def msg_size(self):
return 64
return 4096

@property
def msg_count(self):
return int(20 * self.producer_throughput / self.msg_size)
return int(100 if self.debug_mode else 5000 * self.segment_size /
self.msg_size)

@property
def producer_throughput(self):
return 1024 if self.debug_mode else 1024 * 1024

@property
def segment_size(self):
return 1024 * 1024

@property
def local_retention_bytes(self):
return 4 * self.segment_size

def start_producer(self):
self.logger.info(
f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps"
Expand All @@ -179,8 +202,8 @@ def start_producer(self):
self._topic,
self.msg_size,
self.msg_count,
custom_node=self.preallocated_nodes,
rate_limit_bps=self.producer_throughput)
key_set_cardinality=10000,
custom_node=self.preallocated_nodes)

self.producer.start(clean=False)

Expand Down Expand Up @@ -233,22 +256,58 @@ def _replicas_per_node(self):
return node_replicas

@cluster(num_nodes=11, log_allow_list=RESTART_LOG_ALLOW_LIST)
@matrix(balancing_mode=['off', 'node_add'])
def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode):
@matrix(balancing_mode=["off", 'node_add'],
tiered_storage=[True, False],
cleanup_policy=["compact", "compact,delete"])
def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
tiered_storage,
cleanup_policy):
'''
This test executes migration of 3 nodes redpanda cluster from one
set of nodes to the other, during this operation nodes from target pool
are first added to the cluster and then the old pool of nodes is decommissioned.
'''

if self.debug_mode:
self.redpanda._si_settings = None
cleanup_on_early_exit(self)
return

initial_pool = self.redpanda.nodes[0:5]
new_pool = self.redpanda.nodes[5:]

self.redpanda.set_seed_servers(initial_pool)

# start redpanda on initial pool of nodes
self.redpanda.start(nodes=initial_pool,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)
self.admin.patch_cluster_config(
upsert={"partition_autobalancing_mode": balancing_mode})

cfg = {"partition_autobalancing_mode": balancing_mode}
if tiered_storage:
cfg["cloud_storage_enable_remote_write"] = True
cfg["cloud_storage_enable_remote_read"] = True
# we want data to be actually deleted
cfg["retention_local_strict"] = True

self.admin.patch_cluster_config(upsert=cfg)

self._create_topics()

self._create_workload_topic(cleanup_policy=cleanup_policy)
if tiered_storage:
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(
self._topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES,
self.local_retention_bytes)

self.start_producer()
self.start_consumer()

# wait for some messages before executing actions (50 segments)
self.producer.wait_for_acks(50 * self.segment_size // self.msg_size,
timeout_sec=60,
backoff_sec=2)
# add new nodes to the cluster
self.redpanda.for_nodes(
new_pool,
Expand Down Expand Up @@ -283,8 +342,6 @@ def all_nodes_present():

self._wait_for_nodes_removed(decommissioned_ids)

self.verify()

def _all_nodes_balanced():
r_per_node = self._replicas_per_node()
self.logger.info(f"finished with {r_per_node} replicas per node")
Expand All @@ -299,3 +356,5 @@ def _all_nodes_balanced():

wait_until(_all_nodes_balanced, 60, 1,
f"Partitions are not balanced correctly")

self.verify()

0 comments on commit 3f3f79c

Please sign in to comment.