diff --git a/src/v/archival/tests/archival_metadata_stm_gtest.cc b/src/v/archival/tests/archival_metadata_stm_gtest.cc index 21d28bcdc5d7..5e3211085260 100644 --- a/src/v/archival/tests/archival_metadata_stm_gtest.cc +++ b/src/v/archival/tests/archival_metadata_stm_gtest.cc @@ -231,17 +231,17 @@ TEST_F_CORO( 10s, [&reached_dispatch_append, &may_resume_append](raft::raft_node_instance& node) { - node.on_dispatch( - [&reached_dispatch_append, &may_resume_append](raft::msg_type t) { - if (t == raft::msg_type::append_entries) { - if (!reached_dispatch_append.available()) { - reached_dispatch_append.set_value(true); - } - return may_resume_append.get_shared_future(); - } - - return ss::now(); - }); + node.on_dispatch([&reached_dispatch_append, &may_resume_append]( + model::node_id, raft::msg_type t) { + if (t == raft::msg_type::append_entries) { + if (!reached_dispatch_append.available()) { + reached_dispatch_append.set_value(true); + } + return may_resume_append.get_shared_future(); + } + + return ss::now(); + }); return node.get_vnode(); }); diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index a71dd89f71b8..9f56d1994582 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -520,19 +520,34 @@ void consensus::maybe_promote_to_voter(vnode id) { return ss::now(); } - vlog(_ctxlog.trace, "promoting node {} to voter", id); - return _op_lock.get_units() - .then([this, id](ssx::semaphore_units u) mutable { - auto latest_cfg = _configuration_manager.get_latest(); - latest_cfg.promote_to_voter(id); + // do not promote if the previous configuration is still uncommitted, + // otherwise we may add several new voters in quick succession, that the + // old voters will not know of, resulting in a possibility of + // non-intersecting quorums. + if (_configuration_manager.get_latest_offset() > _commit_index) { + return ss::now(); + } - return replicate_configuration( - std::move(u), std::move(latest_cfg)); - }) - .then([this, id](std::error_code ec) { - vlog( - _ctxlog.trace, "node {} promotion result {}", id, ec.message()); - }); + return _op_lock.get_units().then([this, + id](ssx::semaphore_units u) mutable { + // check once more under _op_lock to protect against races with + // concurrent voter promotions. + if (_configuration_manager.get_latest_offset() > _commit_index) { + return ss::now(); + } + + vlog(_ctxlog.trace, "promoting node {} to voter", id); + auto latest_cfg = _configuration_manager.get_latest(); + latest_cfg.promote_to_voter(id); + return replicate_configuration(std::move(u), std::move(latest_cfg)) + .then([this, id](std::error_code ec) { + vlog( + _ctxlog.trace, + "node {} promotion result {}", + id, + ec.message()); + }); + }); }); } diff --git a/src/v/raft/tests/basic_raft_fixture_test.cc b/src/v/raft/tests/basic_raft_fixture_test.cc index 06846d61babb..40d8ce403fe8 100644 --- a/src/v/raft/tests/basic_raft_fixture_test.cc +++ b/src/v/raft/tests/basic_raft_fixture_test.cc @@ -273,7 +273,7 @@ TEST_P_CORO( co_await set_write_caching(params.write_caching); for (auto& [_, node] : nodes()) { - node->on_dispatch([](raft::msg_type t) { + node->on_dispatch([](model::node_id, raft::msg_type t) { if ( t == raft::msg_type::append_entries && random_generators::get_int(1000) > 800) { @@ -405,7 +405,7 @@ TEST_P_CORO(quorum_acks_fixture, test_progress_on_truncation) { // truncation. for (auto& [id, node] : nodes()) { if (id == leader_id) { - node->on_dispatch([](raft::msg_type t) { + node->on_dispatch([](model::node_id, raft::msg_type t) { if ( t == raft::msg_type::append_entries || t == raft::msg_type::vote) { diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index f5bfb6b6e8b5..608ca278b625 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -240,8 +240,7 @@ channel& in_memory_test_protocol::get_channel(model::node_id id) { return *it->second; } -void in_memory_test_protocol::on_dispatch( - ss::noncopyable_function(msg_type)> f) { +void in_memory_test_protocol::on_dispatch(dispatch_callback_t f) { _on_dispatch_handlers.push_back(std::move(f)); } @@ -301,7 +300,7 @@ in_memory_test_protocol::dispatch(model::node_id id, ReqT req) { const auto msg_type = map_msg_type(); for (const auto& f : _on_dispatch_handlers) { - co_await f(msg_type); + co_await f(id, msg_type); } try { @@ -557,8 +556,7 @@ raft_node_instance::random_batch_base_offset(model::offset max) { co_return batches.front().base_offset(); } -void raft_node_instance::on_dispatch( - ss::noncopyable_function(msg_type)> f) { +void raft_node_instance::on_dispatch(dispatch_callback_t f) { _protocol->on_dispatch(std::move(f)); } @@ -569,6 +567,9 @@ seastar::future<> raft_fixture::TearDownAsync() { co_await seastar::coroutine::parallel_for_each( _nodes, [](auto& pair) { return pair.second->stop(); }); + co_await seastar::coroutine::parallel_for_each( + _nodes, [](auto& pair) { return pair.second->remove_data(); }); + co_await _features.stop(); } seastar::future<> raft_fixture::SetUpAsync() { @@ -620,7 +621,7 @@ raft_fixture::stop_node(model::node_id id, remove_data_dir remove) { } raft_node_instance& raft_fixture::node(model::node_id id) { - return *_nodes.find(id)->second; + return *_nodes.at(id); } ss::future diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 67d6d5734a75..ce2798952103 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -92,6 +92,9 @@ struct raft_node_map { node_for(model::node_id) = 0; }; +using dispatch_callback_t + = ss::noncopyable_function(model::node_id, msg_type)>; + class in_memory_test_protocol : public consensus_client_protocol::impl { public: explicit in_memory_test_protocol(raft_node_map&, prefix_logger&); @@ -129,7 +132,7 @@ class in_memory_test_protocol : public consensus_client_protocol::impl { channel& get_channel(model::node_id id); - void on_dispatch(ss::noncopyable_function(msg_type)> f); + void on_dispatch(dispatch_callback_t f); ss::future<> stop(); @@ -138,8 +141,7 @@ class in_memory_test_protocol : public consensus_client_protocol::impl { ss::future> dispatch(model::node_id, ReqT req); ss::gate _gate; absl::flat_hash_map> _channels; - std::vector(msg_type)>> - _on_dispatch_handlers; + std::vector _on_dispatch_handlers; raft_node_map& _nodes; prefix_logger& _logger; }; @@ -227,7 +229,7 @@ class raft_node_instance : public ss::weakly_referencable { /// //// \param f The callback function to be invoked when a message is /// dispatched. - void on_dispatch(ss::noncopyable_function(msg_type)> f); + void on_dispatch(dispatch_callback_t); private: model::node_id _id; diff --git a/src/v/raft/tests/raft_reconfiguration_test.cc b/src/v/raft/tests/raft_reconfiguration_test.cc index 797cd8917825..5781083d1bb0 100644 --- a/src/v/raft/tests/raft_reconfiguration_test.cc +++ b/src/v/raft/tests/raft_reconfiguration_test.cc @@ -25,6 +25,7 @@ #include "serde/serde.h" #include "storage/record_batch_builder.h" #include "test_utils/async.h" +#include "test_utils/randoms.h" #include "test_utils/test.h" #include @@ -35,6 +36,7 @@ #include #include +#include #include #include @@ -49,16 +51,30 @@ static ss::logger test_log("reconfiguration-test"); */ using use_snapshot = ss::bool_class; -using use_initial_learner_offset = ss::bool_class; -struct test_params { - use_snapshot snapshot; - int initial_size; - int nodes_to_add; - int nodes_to_remove; - use_initial_learner_offset learner_start_offset; - consistency_level consistency_level = raft::consistency_level::quorum_ack; +using use_initial_learner_offset + = ss::bool_class; + +enum class isolated_t { + none, + old_leader, + old_followers, + random, }; +std::ostream& operator<<(std::ostream& o, isolated_t pt) { + switch (pt) { + case isolated_t::none: + return o << "isolated::none"; + case isolated_t::old_leader: + return o << "isolated::old_leader"; + case isolated_t::old_followers: + return o << "isolated::old_followers"; + case isolated_t::random: + return o << "isolated::random"; + } + __builtin_unreachable(); +} + struct reconfiguration_test : testing::WithParamInterface> + consistency_level, + isolated_t>> , raft_fixture { - ss::future<> - wait_for_reconfiguration_to_finish(std::chrono::milliseconds timeout) { - RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this] { - for (auto& [_, n] : nodes()) { + ss::future<> wait_for_reconfiguration_to_finish( + const absl::flat_hash_set& target_ids, + std::chrono::milliseconds timeout) { + RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this, target_ids] { + for (auto id : target_ids) { + auto& n = node(id); if ( - n->raft()->config().get_state() + n.raft()->config().get_state() != raft::configuration_state::simple) { return false; } @@ -130,16 +149,16 @@ wait_for_offset(model::offset expected, raft_fixture::raft_nodes_t& nodes) { co_return result(raft::errc::timeout); } -void assert_offset_translator_state_is_consistent( - raft_fixture::raft_nodes_t& nodes) { - if (nodes.size() == 1) { +static void assert_offset_translator_state_is_consistent( + const std::vector& nodes) { + if (nodes.size() <= 1) { return; } model::offset start_offset{}; - auto first_raft = nodes.begin()->second->raft(); + auto first_raft = nodes.front()->raft(); model::offset dirty_offset = first_raft->dirty_offset(); // get the max start offset - for (auto& [id, n] : nodes) { + for (auto* n : nodes) { start_offset = std::max(n->raft()->start_offset(), start_offset); } std::vector deltas; @@ -152,46 +171,50 @@ void assert_offset_translator_state_is_consistent( auto idx = 0; for (model::offset o : boost::irange(start_offset, dirty_offset)) { - ASSERT_EQ( - it->second->raft()->log()->offset_delta(o), deltas[idx++]); + ASSERT_EQ((*it)->raft()->log()->offset_delta(o), deltas[idx++]); } } } TEST_P_CORO(reconfiguration_test, configuration_replace_test) { const auto param = GetParam(); - use_snapshot snapshot = std::get<0>(param); + auto snapshot = std::get(param); int initial_size = std::get<1>(param); int nodes_to_add = std::get<2>(param); int nodes_to_remove = std::get<3>(param); - use_initial_learner_offset use_learner_start_offset = std::get<4>(param); - consistency_level consistency_level = std::get<5>(param); + auto use_learner_start_offset = std::get(param); + auto consistency_lvl = std::get(param); + auto isolated = std::get(param); // skip test cases that makes no sense if ( nodes_to_add + initial_size - nodes_to_remove <= 0 || initial_size < nodes_to_remove) { co_return; } + if (initial_size == 1 && isolated == isolated_t::old_followers) { + co_return; + } fmt::print( "test parameters: {{snapshot: {}, initial_size: {}, " "nodes_to_add: {}, nodes_to_remove: {}, use_learner_start_offset: {}, " - "consistency_lvl: {}}}\n", + "consistency_lvl: {}, isolated: {}}}\n", snapshot, initial_size, nodes_to_add, nodes_to_remove, use_learner_start_offset, - consistency_level); + consistency_lvl, + isolated); // create group with initial configuration co_await create_simple_group(initial_size); // replicate batches auto result = co_await retry_with_leader( model::timeout_clock::now() + 30s, - [this, consistency_level](raft_node_instance& leader_node) { + [this, consistency_lvl](raft_node_instance& leader_node) { return leader_node.raft() ->replicate( - make_random_batches(), replicate_options(consistency_level)) + make_random_batches(), replicate_options(consistency_lvl)) .then([this](::result r) { if (!r) { return ss::make_ready_future<::result>( @@ -207,7 +230,7 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { auto& leader_node = node(leader); model::offset start_offset = leader_node.raft()->start_offset(); if (snapshot) { - if (consistency_level == consistency_level::leader_ack) { + if (consistency_lvl == consistency_level::leader_ack) { for (auto& [_, n] : nodes()) { co_await n->raft()->refresh_commit_index(); } @@ -249,9 +272,13 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { } } + auto old_node_ids = all_ids(); + + auto current_node_ids = old_node_ids; auto current_nodes = all_vnodes(); for (int i = 0; i < nodes_to_remove; ++i) { + current_node_ids.erase(current_nodes.back().id()); current_nodes.pop_back(); } absl::flat_hash_set added_nodes; @@ -260,6 +287,7 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { auto& n = add_node( model::node_id(initial_size + i), model::revision_id(0)); current_nodes.push_back(n.get_vnode()); + current_node_ids.insert(n.get_vnode().id()); added_nodes.emplace(n.get_vnode()); return n.init_and_start({}); }); @@ -267,6 +295,12 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { ASSERT_EQ_CORO( current_nodes.size(), initial_size + nodes_to_add - nodes_to_remove); + vlog( + test_log.info, + "dispatching reconfiguration: {} -> {}", + old_node_ids, + current_node_ids); + // update group configuration auto success = co_await retry_with_leader( model::timeout_clock::now() + 30s, @@ -282,10 +316,53 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { }); }); ASSERT_TRUE_CORO(success); + + auto isolated_nodes + = ss::make_lw_shared>(); + switch (isolated) { + case isolated_t::none: + break; + case isolated_t::old_leader: + isolated_nodes->insert(leader); + break; + case isolated_t::old_followers: + *isolated_nodes = old_node_ids; + isolated_nodes->erase(leader); + break; + case isolated_t::random: + for (auto n : all_ids()) { + if (tests::random_bool()) { + isolated_nodes->insert(n); + } + } + break; + } + + if (!isolated_nodes->empty()) { + vlog(test_log.info, "isolating nodes: {}", *isolated_nodes); + + for (const auto& [source_id, node] : nodes()) { + node->on_dispatch([=](model::node_id dest_id, raft::msg_type) { + if ( + isolated_nodes->contains(source_id) + != isolated_nodes->contains(dest_id)) { + return ss::sleep(5s); + } + return ss::now(); + }); + } + + // heal the partition 5s later + (void)ss::sleep(5s).then([isolated_nodes] { + vlog(test_log.info, "healing the network partition"); + isolated_nodes->clear(); + }); + } + co_await with_leader( - 30s, [this, consistency_level](raft_node_instance& leader_node) { + 30s, [this, consistency_lvl](raft_node_instance& leader_node) { // wait for committed offset to propagate - if (consistency_level == raft::consistency_level::quorum_ack) { + if (consistency_lvl == raft::consistency_level::quorum_ack) { return wait_for_committed_offset( leader_node.raft()->committed_offset(), 30s); } else { @@ -294,7 +371,7 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { } }); - co_await wait_for_reconfiguration_to_finish(30s); + co_await wait_for_reconfiguration_to_finish(current_node_ids, 30s); co_await assert_logs_equal(start_offset); @@ -302,8 +379,9 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { current_nodes.begin(), current_nodes.end()); // validate configuration - for (const auto& [_, n] : nodes()) { - auto cfg = n->raft()->config(); + for (auto id : current_node_ids) { + auto& n = node(id); + auto cfg = n.raft()->config(); auto cfg_vnodes = cfg.all_nodes(); ASSERT_EQ_CORO( current_nodes_set, @@ -312,11 +390,16 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { ASSERT_FALSE_CORO(cfg.old_config().has_value()); ASSERT_TRUE_CORO(cfg.current_config().learners.empty()); - if (learner_start_offset && added_nodes.contains(n->get_vnode())) { - ASSERT_EQ_CORO(n->raft()->start_offset(), learner_start_offset); + if (learner_start_offset && added_nodes.contains(n.get_vnode())) { + ASSERT_EQ_CORO(n.raft()->start_offset(), learner_start_offset); } } - assert_offset_translator_state_is_consistent(nodes()); + + std::vector current_node_ptrs; + for (auto id : current_node_ids) { + current_node_ptrs.push_back(&node(id)); + } + assert_offset_translator_state_is_consistent(current_node_ptrs); } INSTANTIATE_TEST_SUITE_P( @@ -329,4 +412,19 @@ INSTANTIATE_TEST_SUITE_P( testing::Values(0, 1, 3), // to remove testing::Values(use_initial_learner_offset::yes), testing::Values( - consistency_level::quorum_ack, consistency_level::leader_ack))); + consistency_level::quorum_ack, consistency_level::leader_ack), + testing::Values(isolated_t::none))); + +INSTANTIATE_TEST_SUITE_P( + reconfiguration_with_isolated_nodes, + reconfiguration_test, + testing::Combine( + testing::Values(use_snapshot::no), + testing::Values(3), // initial size + testing::Values(2), // to add + testing::Values(0, 2), // to remove + testing::Values(use_initial_learner_offset::yes), + testing::Values( + consistency_level::quorum_ack, consistency_level::leader_ack), + testing::Values( + isolated_t::old_followers, isolated_t::old_leader, isolated_t::random))); diff --git a/src/v/raft/tests/replication_monitor_tests.cc b/src/v/raft/tests/replication_monitor_tests.cc index d73e68c128d5..e6914a411170 100644 --- a/src/v/raft/tests/replication_monitor_tests.cc +++ b/src/v/raft/tests/replication_monitor_tests.cc @@ -90,7 +90,7 @@ TEST_P_CORO(monitor_test_fixture, truncation_detection) { for (auto& [id, node] : nodes()) { if (id == leader) { - node->on_dispatch([](raft::msg_type t) { + node->on_dispatch([](model::node_id, raft::msg_type t) { if ( t == raft::msg_type::append_entries || t == raft::msg_type::vote) {