diff --git a/tests/rptest/tests/node_pool_migration_test.py b/tests/rptest/tests/node_pool_migration_test.py index 6362ba0a87055..00a475444e7f2 100644 --- a/tests/rptest/tests/node_pool_migration_test.py +++ b/tests/rptest/tests/node_pool_migration_test.py @@ -8,28 +8,41 @@ # by the Apache License, Version 2.0 from concurrent.futures import ThreadPoolExecutor -from math import ceil, floor import random -from threading import Thread +import re import requests from rptest.clients.kafka_cat import KafkaCat -from time import sleep -from rptest.clients.default import DefaultClient from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.clients.rpk import RpkTool -from rptest.tests.redpanda_test import RedpandaTest from rptest.services.cluster import cluster from ducktape.utils.util import wait_until -from ducktape.mark import parametrize from ducktape.mark import matrix from rptest.clients.types import TopicSpec -from rptest.tests.end_to_end import EndToEndTest from rptest.services.admin import Admin -from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, make_redpanda_service +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings +from rptest.utils.mode_checks import cleanup_on_early_exit from rptest.utils.node_operations import NodeDecommissionWaiter +from enum import Enum + +TS_LOG_ALLOW_LIST = [ + re.compile( + "archival_metadata_stm.*Replication wait for archival STM timed out"), + # topic deletion may happen before data were uploaded + re.compile("cloud_storage.*Failed to fetch manifest during finalize().*") +] + + +class TestMode(str, Enum): + NO_TIRED_STORAGE = "no_tiered_storage" + TIRED_STORAGE = "tiered_storage" + FAST_MOVES = "tiered_storage_fast_moves" + + @property + def has_tiered_storage(self): + return self.value == self.TIRED_STORAGE or self.value == self.FAST_MOVES class NodePoolMigrationTest(PreallocNodesTest): @@ -39,9 +52,14 @@ class NodePoolMigrationTest(PreallocNodesTest): def __init__(self, test_context): self._topic = None - super(NodePoolMigrationTest, self).__init__(test_context=test_context, - num_brokers=10, - node_prealloc_count=1) + super(NodePoolMigrationTest, self).__init__( + test_context=test_context, + num_brokers=10, + node_prealloc_count=1, + si_settings=SISettings(test_context, + cloud_storage_enable_remote_read=True, + cloud_storage_enable_remote_write=True, + fast_uploads=True)) def setup(self): # defer starting redpanda to test body @@ -70,10 +88,18 @@ def _create_topics(self, replication_factors=[1, 3]): for spec in topics: self.client().create_topic(spec) - self._topic = random.choice(topics).name - return total_partitions + def _create_workload_topic(self, cleanup_policy): + spec = TopicSpec(name=f"migration-test-workload", + partition_count=8, + replication_factor=3, + cleanup_policy=cleanup_policy, + segment_bytes=self.segment_size) + + self.client().create_topic(spec) + self._topic = spec.name + # after node was removed the state should be consistent on all other not removed nodes def _check_state_consistent(self, decommissioned_id): @@ -159,16 +185,25 @@ def decommissioned(): @property def msg_size(self): - return 64 + return 4096 @property def msg_count(self): - return int(20 * self.producer_throughput / self.msg_size) + return int(100 if self.debug_mode else 5000 * self.segment_size / + self.msg_size) @property def producer_throughput(self): return 1024 if self.debug_mode else 1024 * 1024 + @property + def segment_size(self): + return 1024 * 1024 + + @property + def local_retention_bytes(self): + return 4 * self.segment_size + def start_producer(self): self.logger.info( f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps" @@ -179,8 +214,9 @@ def start_producer(self): self._topic, self.msg_size, self.msg_count, + key_set_cardinality=10000, custom_node=self.preallocated_nodes, - rate_limit_bps=self.producer_throughput) + debug_logs=True) self.producer.start(clean=False) @@ -232,23 +268,69 @@ def _replicas_per_node(self): return node_replicas - @cluster(num_nodes=11, log_allow_list=RESTART_LOG_ALLOW_LIST) - @matrix(balancing_mode=['off', 'node_add']) - def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode): + @cluster(num_nodes=11, + log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST) + @matrix(balancing_mode=["off", 'node_add'], + test_mode=[ + TestMode.NO_TIRED_STORAGE, TestMode.TIRED_STORAGE, + TestMode.FAST_MOVES + ], + cleanup_policy=["compact", "compact,delete"]) + def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode, + test_mode: TestMode, + cleanup_policy): + ''' + This test executes migration of 3 nodes redpanda cluster from one + set of nodes to the other, during this operation nodes from target pool + are first added to the cluster and then the old pool of nodes is decommissioned. + ''' + + if self.debug_mode: + self.redpanda._si_settings = None + cleanup_on_early_exit(self) + return + initial_pool = self.redpanda.nodes[0:5] new_pool = self.redpanda.nodes[5:] + self.redpanda.set_seed_servers(initial_pool) # start redpanda on initial pool of nodes self.redpanda.start(nodes=initial_pool, auto_assign_node_id=True, omit_seeds_on_idx_one=False) - self.admin.patch_cluster_config( - upsert={"partition_autobalancing_mode": balancing_mode}) + + cfg = {"partition_autobalancing_mode": balancing_mode} + if test_mode.has_tiered_storage: + cfg["cloud_storage_enable_remote_write"] = True + cfg["cloud_storage_enable_remote_read"] = True + # we want data to be actually deleted + cfg["retention_local_strict"] = True + + if test_mode == TestMode.FAST_MOVES: + self.redpanda.set_cluster_config({ + "initial_retention_local_target_bytes_default": + 3 * self.segment_size + }) + + self.admin.patch_cluster_config(upsert=cfg) + self._create_topics() + self._create_workload_topic(cleanup_policy=cleanup_policy) + if test_mode.has_tiered_storage: + rpk = RpkTool(self.redpanda) + rpk.alter_topic_config( + self._topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + self.local_retention_bytes) + self.start_producer() self.start_consumer() + + # wait for some messages before executing actions (50 segments) + self.producer.wait_for_acks(50 * self.segment_size // self.msg_size, + timeout_sec=60, + backoff_sec=2) # add new nodes to the cluster self.redpanda.for_nodes( new_pool, @@ -283,8 +365,6 @@ def all_nodes_present(): self._wait_for_nodes_removed(decommissioned_ids) - self.verify() - def _all_nodes_balanced(): r_per_node = self._replicas_per_node() self.logger.info(f"finished with {r_per_node} replicas per node") @@ -299,3 +379,22 @@ def _all_nodes_balanced(): wait_until(_all_nodes_balanced, 60, 1, f"Partitions are not balanced correctly") + + def _quiescent_state(): + pb_status = self.admin.get_partition_balancer_status( + node=random.choice(new_pool)) + reconfigurations = self.admin.list_reconfigurations( + node=random.choice(new_pool)) + return len(reconfigurations) == 0 and pb_status[ + 'status'] == 'ready' or pb_status['status'] == 'off' + + wait_until(_quiescent_state, + 120, + 1, + f"Cluster reached quiescent state (no partition movement)", + retry_on_exc=True) + + for n in initial_pool: + self.redpanda.stop_node(n) + + self.verify()