From 1ae86796365daee73333c0040bd83aa0477da9de Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 7 Nov 2023 10:33:40 +0100 Subject: [PATCH] tests: added basic tests for leader_acks with failure injection Signed-off-by: Michal Maslanka --- tests/rptest/tests/simple_e2e_test.py | 66 +++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/rptest/tests/simple_e2e_test.py b/tests/rptest/tests/simple_e2e_test.py index 0de52c157efd2..6562f287421e9 100644 --- a/tests/rptest/tests/simple_e2e_test.py +++ b/tests/rptest/tests/simple_e2e_test.py @@ -7,12 +7,24 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import random +import threading +import time +from rptest.services.failure_injector import FailureInjector, FailureSpec from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.services.redpanda import SISettings from rptest.tests.end_to_end import EndToEndTest +from rptest.utils.mode_checks import skip_debug_mode class SimpleEndToEndTest(EndToEndTest): + def __init__(self, test_context, *args, **kwargs): + + super(SimpleEndToEndTest, self).__init__(test_context=test_context, + *args, + **kwargs) + @cluster(num_nodes=6) def test_correctness_while_evicitng_log(self): ''' @@ -69,3 +81,57 @@ def test_consumer_interruption(self): assert error is not None assert "Consumed from an unexpected" in str( error) or "is behind the current committed offset" in str(error) + + @skip_debug_mode + @cluster(num_nodes=6) + def test_leader_acks(self): + ev = threading.Event() + + def inject_failures(): + fi = FailureInjector(self.redpanda) + while not ev.is_set(): + + node = random.choice(self.redpanda.nodes) + fi.inject_failure( + FailureSpec(type=FailureSpec.FAILURE_KILL, + length=0, + node=node)) + time.sleep(5) + + # use small segment size to enable log eviction + self.start_redpanda(num_nodes=3, + si_settings=SISettings( + test_context=self.test_context, + fast_uploads=True), + extra_rp_conf={ + "log_segment_size": + 1048576, + "retention_bytes": + 5242880, + "default_topic_replications": + 3, + "raft_replica_max_pending_flush_bytes": + 1024 * 1024 * 1024 * 1024, + "raft_flush_timer_interval_ms": + 3000000 + }) + + spec = TopicSpec(name="verify-leader-ack", + partition_count=16, + replication_factor=3) + self.client().create_topic(spec) + + self.topic = spec.name + + self.start_producer(2, throughput=10000, acks=1) + + self.start_consumer(1) + self.await_startup() + thread = threading.Thread(target=inject_failures, daemon=True) + thread.start() + + self.run_validation(min_records=100000, + producer_timeout_sec=300, + consumer_timeout_sec=300) + ev.set() + thread.join()