From 53b82ba9459934b05895674f3e8374a3531a5716 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 11 Apr 2024 16:46:55 -0700 Subject: [PATCH] replication_monitor: fix missed notifications during race If the replication notification happens before wait_for_majority() is called, the waiter is never resolved. Adds an additional check before creating the waiter instance. --- src/v/raft/replication_monitor.cc | 7 +++++++ src/v/raft/tests/basic_raft_fixture_test.cc | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/v/raft/replication_monitor.cc b/src/v/raft/replication_monitor.cc index 0f8ffaba327a..933ba5754003 100644 --- a/src/v/raft/replication_monitor.cc +++ b/src/v/raft/replication_monitor.cc @@ -101,11 +101,18 @@ ss::future replication_monitor::do_wait_until( ss::future replication_monitor::wait_until_committed(storage::append_result append) { + auto done = is_append_committed_or_truncated(append); + if (done) { + return ssx::now(done.value()); + } return do_wait_until(append, wait_type::commit); } ss::future replication_monitor::wait_until_majority_replicated( storage::append_result append) { + if (is_append_replicated(append)) { + return ssx::now(errc::success); + } return do_wait_until(append, wait_type::majority_replication); } diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 40d8ce403fe8..76c0794cf33c 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -120,6 +120,24 @@ TEST_P_CORO(all_acks_fixture, validate_replication) { co_await assert_logs_equal(); } +TEST_P_CORO(all_acks_fixture, single_node_replication) { + co_await create_simple_group(1); + + auto params = GetParam(); + co_await set_write_caching(params.write_caching); + + auto leader = co_await wait_for_leader(10s); + auto& leader_node = node(leader); + + auto result = co_await leader_node.raft()->replicate( + make_batches({{"k_1", "v_1"}}), replicate_options(params.c_lvl)); + ASSERT_TRUE_CORO(result.has_value()); + + // wait for committed offset to propagate + co_await wait_for_committed_offset(result.value().last_offset, 5s); + co_await assert_logs_equal(); +} + TEST_P_CORO(all_acks_fixture, validate_recovery) { co_await create_simple_group(5); auto leader = co_await wait_for_leader(10s);