From 89bb48f8702762d33b59a7c9b9710bde4a97478c Mon Sep 17 00:00:00 2001 From: drdr xp Date: Thu, 20 May 2021 15:30:07 +0800 Subject: [PATCH] fix: last_applied should be updated only when logs actually applied. --- .github/workflows/ci.yaml | 5 ++ async-raft/src/core/append_entries.rs | 14 ++-- .../metrics_state_machine_consistency.rs | 71 +++++++++++++++++++ 3 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 async-raft/tests/metrics_state_machine_consistency.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6a87bbad4..440b4c4af 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -26,6 +26,11 @@ jobs: with: command: test args: -p async-raft --test initialization + - name: Integration Test | metrics state machine consistency + uses: actions-rs/cargo@v1 + with: + command: test + args: -p async-raft --test metrics_state_machine_consistency - name: Integration Test | NonVoter restart uses: actions-rs/cargo@v1 with: diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index cf3594a69..ef00e0f0b 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -242,15 +242,13 @@ impl, S: RaftStorage> Ra } }) .collect(); - // If we actually have some cached entries to apply, then we optimistically update, as - // `self.last_applied` is held in-memory only, and if an error does come up, then - // Raft will go into shutdown. - if let Some(index) = last_entry_seen { - self.last_applied = index; - self.report_metrics(); - } + // If we have no data entries to apply, then do nothing. if entries.is_empty() { + if let Some(index) = last_entry_seen { + self.last_applied = index; + self.report_metrics(); + } return; } // Spawn task to replicate these entries to the state machine. @@ -260,7 +258,7 @@ impl, S: RaftStorage> Ra // interface a bit before 1.0. let entries_refs: Vec<_> = entries.iter().map(|(k, v)| (k, v)).collect(); storage.replicate_to_state_machine(&entries_refs).await?; - Ok(None) + Ok(last_entry_seen) }); self.replicate_to_sm_handle.push(handle); } diff --git a/async-raft/tests/metrics_state_machine_consistency.rs b/async-raft/tests/metrics_state_machine_consistency.rs new file mode 100644 index 000000000..890184faf --- /dev/null +++ b/async-raft/tests/metrics_state_machine_consistency.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::hashset; + +use async_raft::{Config, State}; +use fixtures::RaftRouter; + +mod fixtures; + +/// Cluster metrics_state_machine_consistency test. +/// +/// What does this test do? +/// +/// - brings 2 nodes online: one leader and one non-voter. +/// - write one log to the leader. +/// - asserts that when metrics.last_applied is upto date, the state machine should be upto date too. +/// +/// RUST_LOG=async_raft,memstore,metrics_state_machine_consistency=trace cargo test -p async-raft --test metrics_state_machine_consistency +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn metrics_state_machine_consistency() -> Result<()> { + fixtures::init_tracing(); + + // Setup test dependencies. + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + + router.new_raft_node(0).await; + router.new_raft_node(1).await; + + tracing::info!("--- initializing single node cluster"); + + // Wait for node 0 to become leader. + router.initialize_with(0, hashset![0]).await?; + router + .wait_for_metrics( + &0u64, + |x| x.state == State::Leader, + Duration::from_micros(100), + "n0.state -> Leader", + ) + .await; + + tracing::info!("--- add one non-voter"); + router.add_non_voter(0, 1).await?; + + tracing::info!("--- write one log"); + router.client_request(0, "foo", 1).await; + + // Wait for metrics to be up to date. + // Once last_applied updated, the key should be visible in state machine. + tracing::info!("--- wait for log to sync"); + let want = 2u64; + for node_id in 0..2 { + router + .wait_for_metrics( + &node_id, + |x| x.last_applied == want, + Duration::from_micros(100), + &format!("n{}.last_applied -> {}", node_id, want), + ) + .await; + + let sto = router.get_sto(&node_id).await; + assert!(sto.get_state_machine().await.client_status.get("foo").is_some()); + + } + + Ok(()) +}