Skip to content

Commit

Permalink
tests: added tiered storage to node pool migration test
Browse files Browse the repository at this point in the history
Extended node pool migration test with tiered storage and different
cleanup policies. Additionally added a mode in which moves are executed
using fast partition movements.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Nov 9, 2023
1 parent 1bb532d commit 7217444
Showing 1 changed file with 122 additions and 23 deletions.
145 changes: 122 additions & 23 deletions tests/rptest/tests/node_pool_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,41 @@
# by the Apache License, Version 2.0

from concurrent.futures import ThreadPoolExecutor
from math import ceil, floor
import random
from threading import Thread
import re

import requests
from rptest.clients.kafka_cat import KafkaCat
from time import sleep
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.tests.prealloc_nodes import PreallocNodesTest

from rptest.clients.rpk import RpkTool
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import matrix
from rptest.clients.types import TopicSpec
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.admin import Admin
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, make_redpanda_service
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings
from rptest.utils.mode_checks import cleanup_on_early_exit
from rptest.utils.node_operations import NodeDecommissionWaiter
from enum import Enum

TS_LOG_ALLOW_LIST = [
re.compile(
"archival_metadata_stm.*Replication wait for archival STM timed out"),
# topic deletion may happen before data were uploaded
re.compile("cloud_storage.*Failed to fetch manifest during finalize().*")
]


class TestMode(str, Enum):
NO_TIRED_STORAGE = "no_tiered_storage"
TIRED_STORAGE = "tiered_storage"
FAST_MOVES = "tiered_storage_fast_moves"

@property
def has_tiered_storage(self):
return self.value == self.TIRED_STORAGE or self.value == self.FAST_MOVES


class NodePoolMigrationTest(PreallocNodesTest):
Expand All @@ -39,9 +52,14 @@ class NodePoolMigrationTest(PreallocNodesTest):
def __init__(self, test_context):
self._topic = None

super(NodePoolMigrationTest, self).__init__(test_context=test_context,
num_brokers=10,
node_prealloc_count=1)
super(NodePoolMigrationTest, self).__init__(
test_context=test_context,
num_brokers=10,
node_prealloc_count=1,
si_settings=SISettings(test_context,
cloud_storage_enable_remote_read=True,
cloud_storage_enable_remote_write=True,
fast_uploads=True))

def setup(self):
# defer starting redpanda to test body
Expand Down Expand Up @@ -70,10 +88,18 @@ def _create_topics(self, replication_factors=[1, 3]):
for spec in topics:
self.client().create_topic(spec)

self._topic = random.choice(topics).name

return total_partitions

def _create_workload_topic(self, cleanup_policy):
spec = TopicSpec(name=f"migration-test-workload",
partition_count=8,
replication_factor=3,
cleanup_policy=cleanup_policy,
segment_bytes=self.segment_size)

self.client().create_topic(spec)
self._topic = spec.name

# after node was removed the state should be consistent on all other not removed nodes
def _check_state_consistent(self, decommissioned_id):

Expand Down Expand Up @@ -159,16 +185,25 @@ def decommissioned():

@property
def msg_size(self):
return 64
return 4096

@property
def msg_count(self):
return int(20 * self.producer_throughput / self.msg_size)
return int(100 if self.debug_mode else 5000 * self.segment_size /
self.msg_size)

@property
def producer_throughput(self):
return 1024 if self.debug_mode else 1024 * 1024

@property
def segment_size(self):
return 1024 * 1024

@property
def local_retention_bytes(self):
return 4 * self.segment_size

def start_producer(self):
self.logger.info(
f"starting kgo-verifier producer with {self.msg_count} messages of size {self.msg_size} and throughput: {self.producer_throughput} bps"
Expand All @@ -179,8 +214,9 @@ def start_producer(self):
self._topic,
self.msg_size,
self.msg_count,
key_set_cardinality=10000,
custom_node=self.preallocated_nodes,
rate_limit_bps=self.producer_throughput)
debug_logs=True)

self.producer.start(clean=False)

Expand Down Expand Up @@ -232,23 +268,69 @@ def _replicas_per_node(self):

return node_replicas

@cluster(num_nodes=11, log_allow_list=RESTART_LOG_ALLOW_LIST)
@matrix(balancing_mode=['off', 'node_add'])
def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode):
@cluster(num_nodes=11,
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
@matrix(balancing_mode=["off", 'node_add'],
test_mode=[
TestMode.NO_TIRED_STORAGE, TestMode.TIRED_STORAGE,
TestMode.FAST_MOVES
],
cleanup_policy=["compact", "compact,delete"])
def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
test_mode: TestMode,
cleanup_policy):
'''
This test executes migration of 3 nodes redpanda cluster from one
set of nodes to the other, during this operation nodes from target pool
are first added to the cluster and then the old pool of nodes is decommissioned.
'''

if self.debug_mode:
self.redpanda._si_settings = None
cleanup_on_early_exit(self)
return

initial_pool = self.redpanda.nodes[0:5]
new_pool = self.redpanda.nodes[5:]

self.redpanda.set_seed_servers(initial_pool)

# start redpanda on initial pool of nodes
self.redpanda.start(nodes=initial_pool,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)
self.admin.patch_cluster_config(
upsert={"partition_autobalancing_mode": balancing_mode})

cfg = {"partition_autobalancing_mode": balancing_mode}
if test_mode.has_tiered_storage:
cfg["cloud_storage_enable_remote_write"] = True
cfg["cloud_storage_enable_remote_read"] = True
# we want data to be actually deleted
cfg["retention_local_strict"] = True

if test_mode == TestMode.FAST_MOVES:
self.redpanda.set_cluster_config({
"initial_retention_local_target_bytes_default":
3 * self.segment_size
})

self.admin.patch_cluster_config(upsert=cfg)

self._create_topics()

self._create_workload_topic(cleanup_policy=cleanup_policy)
if test_mode.has_tiered_storage:
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(
self._topic, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES,
self.local_retention_bytes)

self.start_producer()
self.start_consumer()

# wait for some messages before executing actions (50 segments)
self.producer.wait_for_acks(50 * self.segment_size // self.msg_size,
timeout_sec=60,
backoff_sec=2)
# add new nodes to the cluster
self.redpanda.for_nodes(
new_pool,
Expand Down Expand Up @@ -283,8 +365,6 @@ def all_nodes_present():

self._wait_for_nodes_removed(decommissioned_ids)

self.verify()

def _all_nodes_balanced():
r_per_node = self._replicas_per_node()
self.logger.info(f"finished with {r_per_node} replicas per node")
Expand All @@ -299,3 +379,22 @@ def _all_nodes_balanced():

wait_until(_all_nodes_balanced, 60, 1,
f"Partitions are not balanced correctly")

def _quiescent_state():
pb_status = self.admin.get_partition_balancer_status(
node=random.choice(new_pool))
reconfigurations = self.admin.list_reconfigurations(
node=random.choice(new_pool))
return len(reconfigurations) == 0 and pb_status[
'status'] == 'ready' or pb_status['status'] == 'off'

wait_until(_quiescent_state,
120,
1,
f"Cluster reached quiescent state (no partition movement)",
retry_on_exc=True)

for n in initial_pool:
self.redpanda.stop_node(n)

self.verify()

0 comments on commit 7217444

Please sign in to comment.