diff --git a/tests/rptest/tests/leadership_transfer_test.py b/tests/rptest/tests/leadership_transfer_test.py index f233c104a9a59..518fb7affe61c 100644 --- a/tests/rptest/tests/leadership_transfer_test.py +++ b/tests/rptest/tests/leadership_transfer_test.py @@ -16,6 +16,7 @@ from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST from ducktape.utils.util import wait_until from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.rpk import RpkTool from rptest.util import wait_until_result from rptest.clients.types import TopicSpec from rptest.services.admin import Admin @@ -339,3 +340,181 @@ def all_partitions_present(num_nodes, per_node=None): expected_min = math.floor(expected_on_shard * 0.8) assert count >= expected_min, \ f"leader count on shard {s} ({count}) is < {expected_min}" + + +class LeadershipPinningTest(RedpandaTest): + def __init__(self, test_context): + super(LeadershipPinningTest, self).__init__( + test_context=test_context, + num_brokers=6, + extra_rp_conf={ + 'enable_rack_awareness': True, + }, + ) + + def setUp(self): + pass + + RACK_LAYOUT = ['A', 'A', 'B', 'B', 'C', 'C'] + + def _get_topic2node2leaders(self): + kc = KafkaCat(self.redpanda) + md = kc.metadata() + ret = dict() + for topic in md["topics"]: + name = topic["topic"] + node2leaders = dict( + collections.Counter(p["leader"] for p in topic["partitions"])) + self.logger.debug( + f"topic {name} leaders: {sorted(node2leaders.items())}") + + ret[name] = node2leaders + return ret + + def _rack_counts(self, node_counts): + rack2count = dict() + for ix, node in enumerate(self.redpanda.nodes): + node_id = self.redpanda.node_id(node) + leaders = node_counts.get(node_id, 0) + if leaders > 0: + rack = self.RACK_LAYOUT[ix] + rack2count[rack] = rack2count.setdefault(rack, 0) + leaders + return rack2count + + def wait_for_racks(self, + partition_counts, + topic2expected_racks, + check_balance=True, + timeout_sec=60): + def predicate(): + t2n2l = self._get_topic2node2leaders() + + for topic, expected_count in partition_counts.items(): + node2leaders = t2n2l.get(topic, dict()) + + count = sum(node2leaders.values()) + if count != expected_count: + self.logger.debug( + f"not all leaders for topic {topic} present, " + f"expected {expected_count}, got {count}") + return False + + expected_racks = topic2expected_racks.get(topic, {}) + rack2leaders = self._rack_counts(node2leaders) + + if expected_racks != rack2leaders.keys(): + self.logger.debug( + f"leader rack sets for topic {topic} differ, " + f"expected: {expected_racks}, actual counts: {rack2leaders}" + ) + return False + + if check_balance: + nonzero_counts = [ + l for l in node2leaders.values() if l > 0 + ] + if min(nonzero_counts) + 2 < max(nonzero_counts): + self.logger.debug( + f"leader counts unbalanced for topic {topic}: " + f"{sorted(node2leaders.items())}") + return False + + return True + + wait_until(predicate, timeout_sec=timeout_sec, backoff_sec=5) + + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_leadership_pinning(self): + for ix, node in enumerate(self.redpanda.nodes): + self.redpanda.set_extra_node_conf(node, { + 'rack': self.RACK_LAYOUT[ix], + }) + self.redpanda.add_extra_rp_conf( + {'default_leaders_preference': "racks: A"}) + self.redpanda.start() + + rpk = RpkTool(self.redpanda) + + partition_counts = {"foo": 60, "bar": 20} + + self.logger.info("creating topics") + + rpk.create_topic("foo", partitions=60, replicas=3) + rpk.create_topic("bar", + partitions=20, + replicas=3, + config={"redpanda.leaders.preference": "racks: C"}) + + # bigger timeout to allow balancer to activate, health reports to propagate, etc. + self.wait_for_racks(partition_counts, { + "foo": {"A"}, + "bar": {"C"} + }, + timeout_sec=90) + + self.logger.info("altering topic preference") + + rpk.alter_topic_config("bar", "redpanda.leaders.preference", + "racks: B, C") + + self.wait_for_racks(partition_counts, { + "foo": {"A"}, + "bar": {"B", "C"} + }, + timeout_sec=30) + + # Decrease idle timeout to not wait too long after nodes are killed + self.redpanda.set_cluster_config({"enable_leader_balancer": False}) + self.redpanda.set_cluster_config( + {"leader_balancer_idle_timeout": 20000}) + self.redpanda.set_cluster_config({"enable_leader_balancer": True}) + + self.logger.info("killing rack B") + + for ix, node in enumerate(self.redpanda.nodes): + if self.RACK_LAYOUT[ix] == "B": + self.redpanda.stop_node(node) + + self.wait_for_racks(partition_counts, { + "foo": {"A"}, + "bar": {"C"} + }, + timeout_sec=60) + + self.logger.info("explicitly disabling for topic") + rpk.alter_topic_config("foo", "redpanda.leaders.preference", "none") + + # There is cross-talk between partition counts of foo and bar, so we don't + # require balanced counts. + self.wait_for_racks(partition_counts, { + "foo": {"A", "C"}, + "bar": {"C"} + }, + check_balance=False, + timeout_sec=30) + + self.logger.info("unset topic configs") + + rpk.delete_topic_config("foo", "redpanda.leaders.preference") + rpk.delete_topic_config("bar", "redpanda.leaders.preference") + + self.wait_for_racks(partition_counts, { + "foo": {"A"}, + "bar": {"A"} + }, + timeout_sec=30) + + self.logger.info("unset default preference") + + for ix, node in enumerate(self.redpanda.nodes): + if self.RACK_LAYOUT[ix] == "B": + self.redpanda.start_node(node) + + self.redpanda.set_cluster_config( + {"default_leaders_preference": "none"}) + self.wait_for_racks(partition_counts, { + "foo": {"A", "B", "C"}, + "bar": {"A", "B", "C"} + }, + check_balance=False, + timeout_sec=90)