From 447dc11cab51fb3b1925177d13e4dd89f998837b Mon Sep 17 00:00:00 2001 From: drdr xp Date: Tue, 13 Jul 2021 16:09:40 +0800 Subject: [PATCH] fix: when finalize_snapshot_installation, memstore should not load membership from its old log that are going to be overridden by snapshot. --- .../tests/snapshot_overrides_membership.rs | 145 ++++++++++++++++++ memstore/src/lib.rs | 14 +- 2 files changed, 149 insertions(+), 10 deletions(-) create mode 100644 async-raft/tests/snapshot_overrides_membership.rs diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs new file mode 100644 index 000000000..fe9764513 --- /dev/null +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -0,0 +1,145 @@ +mod fixtures; + +use std::sync::Arc; + +use anyhow::Result; +use async_raft::raft::AppendEntriesRequest; +use async_raft::raft::Entry; +use async_raft::raft::EntryConfigChange; +use async_raft::raft::EntryPayload; +use async_raft::raft::MembershipConfig; +use async_raft::Config; +use async_raft::LogId; +use async_raft::RaftNetwork; +use async_raft::RaftStorage; +use async_raft::SnapshotPolicy; +use async_raft::State; +use fixtures::RaftRouter; +use maplit::hashset; + +/// Test membership info is sync correctly along with snapshot. +/// +/// What does this test do? +/// +/// - build a stable single node cluster. +/// - send enough requests to the node that log compaction will be triggered. +/// - ensure that snapshot overrides the existent membership on the non-voter. +/// +/// export RUST_LOG=async_raft,memstore,snapshot_overrides_membership=trace +/// cargo test -p async-raft --test snapshot_overrides_membership +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn snapshot_overrides_membership() -> Result<()> { + fixtures::init_tracing(); + + let snapshot_threshold: u64 = 10; + + let config = Arc::new( + Config::build("test".into()) + .snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold)) + .validate() + .expect("failed to build Raft config"), + ); + let router = Arc::new(RaftRouter::new(config.clone())); + + let mut want = 0; + + tracing::info!("--- initializing cluster"); + { + router.new_raft_node(0).await; + + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.initialize_from_single_node(0).await?; + want += 1; + + router.wait_for_log(&hashset![0], want, None, "init leader").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; + } + + tracing::info!("--- send just enough logs to trigger snapshot"); + { + router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; + want = snapshot_threshold; + + router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; + + router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router + .assert_storage_state( + 1, + want, + Some(0), + want, + Some((want.into(), 1, MembershipConfig { + members: hashset![0], + members_after_consensus: None, + })), + ) + .await; + } + + tracing::info!("--- create non-voter"); + { + tracing::info!("--- create non-voter"); + router.new_raft_node(1).await; + let sto = router.get_storage_handle(&1).await?; + + tracing::info!("--- add a membership config log to the non-voter"); + { + let req = AppendEntriesRequest { + term: 1, + leader_id: 0, + prev_log_id: Default::default(), + entries: vec![Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::ConfigChange(EntryConfigChange { + membership: MembershipConfig { + members: hashset![2, 3], + members_after_consensus: None, + }, + }), + }], + leader_commit: 0, + }; + router.append_entries(1, req).await?; + + tracing::info!("--- check that non-voter membership is affected"); + { + let m = sto.get_membership_config().await?; + assert_eq!( + MembershipConfig { + members: hashset![2, 3], + members_after_consensus: None + }, + m + ); + } + } + + tracing::info!("--- add non-voter to the cluster to receive snapshot, which overrides the non-voter storage"); + { + router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); + + router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?; + let expected_snap = Some((want.into(), 1, MembershipConfig { + members: hashset![0u64], + members_after_consensus: None, + })); + router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?; + router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await; + + let m = sto.get_membership_config().await?; + assert_eq!( + MembershipConfig { + members: hashset![0], + members_after_consensus: None + }, + m, + "membership should be overridden by the snapshot" + ); + } + } + + Ok(()) +} diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 3b5a20f28..964131687 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -396,15 +396,6 @@ impl RaftStorage for MemStore { { // Go backwards through the log to find the most recent membership config <= the `through` index. let mut log = self.log.write().await; - let membership_config = log - .values() - .rev() - .skip_while(|entry| entry.log_id.index > index) - .find_map(|entry| match &entry.payload { - EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()), - _ => None, - }) - .unwrap_or_else(|| MembershipConfig::new_initial(self.id)); match &delete_through { Some(through) => { @@ -412,7 +403,10 @@ impl RaftStorage for MemStore { } None => log.clear(), } - log.insert(index, Entry::new_snapshot_pointer(index, term, id, membership_config)); + log.insert( + index, + Entry::new_snapshot_pointer(index, term, id, new_snapshot.meta.membership.clone()), + ); } // Update the state machine.