From 9577c8ed4502158e450202933ccf69a35b53c7ac Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 22 Sep 2023 09:44:49 +0200 Subject: [PATCH] tests: added test verifying if periodic flushing works correctly Signed-off-by: Michal Maslanka (cherry picked from commit 0a7355187f6f28ab73331ef337e9f4d573d2c29e) --- .../rptest/tests/raft_periodic_flush_test.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/rptest/tests/raft_periodic_flush_test.py diff --git a/tests/rptest/tests/raft_periodic_flush_test.py b/tests/rptest/tests/raft_periodic_flush_test.py new file mode 100644 index 0000000000000..01261dfe1d799 --- /dev/null +++ b/tests/rptest/tests/raft_periodic_flush_test.py @@ -0,0 +1,74 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.admin import Admin +from rptest.services.cluster import cluster +from rptest.clients.types import TopicSpec +from rptest.tests.end_to_end import EndToEndTest +from rptest.util import wait_until + + +class PeriodicFlushWithRelaxedConsistencyTest(EndToEndTest): + @cluster(num_nodes=5) + def test_changing_periodic_flush_threshold(self): + + self.start_redpanda(num_nodes=3) + # set raft_max_not_flushed_bytes to high value + self.redpanda.set_cluster_config( + {"raft_replica_max_pending_flush_bytes": 10 * (1024 * 1024)}) + + # create topic with single partition + spec = TopicSpec(partition_count=1, replication_factor=3) + self.client().create_topic(spec) + + self.topic = spec.name + + self.start_producer(1, throughput=1000, acks=1) + self.start_consumer() + self.consumer.start() + msg_count = 10000 + wait_until( + lambda: len(self.producer.acked_values) >= msg_count, + timeout_sec=60, + backoff_sec=1, + err_msg=f"Producer didn't end producing {msg_count} messages") + # wait for at least 15000 records to be consumed + + self.producer.stop() + self.run_validation(min_records=msg_count, + producer_timeout_sec=300, + consumer_timeout_sec=300) + + admin = Admin(self.redpanda) + p_state = admin.get_partition_state(namespace='kafka', + topic=self.topic, + partition=0) + self.logger.info(f"initial partition state: {p_state}") + assert all([ + r['committed_offset'] < r['dirty_offset'] + for r in p_state['replicas'] + ]), "With ACKS=1, committed offset should not be advanced immediately" + + self.redpanda.set_cluster_config( + {"raft_replica_max_pending_flush_bytes": 1}) + + def committed_offset_advanced(): + p_state = admin.get_partition_state(namespace='kafka', + topic=self.topic, + partition=0) + + return all([ + r['committed_offset'] == r['dirty_offset'] + for r in p_state['replicas'] + ]) + + wait_until( + committed_offset_advanced, 30, 1, + "committed offset did not advance after the change of max flushed bytes" + )