diff --git a/Cargo.lock b/Cargo.lock index 25a5598d3d5..c22de95a5d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1013,7 +1013,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.1" -source = "git+https://github.com/pingcap/kvproto.git?branch=release-3.0#7809473e1b70f4bed152e685716d683de30a1de0" +source = "git+https://github.com/pingcap/kvproto.git?branch=release-3.0#053b13a917325c1cf6e9ada7511ac8f46994bf58" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/raftstore/store/fsm/apply.rs b/src/raftstore/store/fsm/apply.rs index e134fd69064..3add24b512a 100644 --- a/src/raftstore/store/fsm/apply.rs +++ b/src/raftstore/store/fsm/apply.rs @@ -1097,6 +1097,10 @@ impl ApplyDelegate { ctx: &ApplyContext, req: &RaftCmdRequest, ) -> Result<(RaftCmdResponse, ApplyResult)> { + fail_point!("on_apply_write_cmd", self.id() == 3, |_| { + unimplemented!(); + }); + let requests = req.get_requests(); let mut responses = Vec::with_capacity(requests.len()); diff --git a/src/raftstore/store/fsm/peer.rs b/src/raftstore/store/fsm/peer.rs index b6232b72837..f3922cdf108 100644 --- a/src/raftstore/store/fsm/peer.rs +++ b/src/raftstore/store/fsm/peer.rs @@ -2254,7 +2254,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { // ReadIndex can be processed on the replicas. let is_read_index_request = request.len() == 1 && request[0].get_cmd_type() == CmdType::ReadIndex; - if !(self.fsm.peer.is_leader() || is_read_index_request) { + let mut read_only = true; + for r in msg.get_requests() { + match r.get_cmd_type() { + CmdType::Get | CmdType::Snap | CmdType::ReadIndex => (), + _ => read_only = false, + } + } + let allow_follower_read = read_only && msg.get_header().get_follower_read(); + if !(self.fsm.peer.is_leader() || is_read_index_request || allow_follower_read) { self.ctx.raft_metrics.invalid_proposal.not_leader += 1; let leader = self.fsm.peer.get_peer_from_cache(leader_id); self.fsm.group_state = GroupState::Chaos; diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index 8b28100b310..35f4a44f978 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -1009,6 +1009,14 @@ impl Peer { && !self.is_merging() } + fn ready_to_handle_unsafe_follower_read(&self, read_index: u64) -> bool { + // Wait until the follower applies all values before the read. There is still a + // problem if the leader applies fewer values than the follower, the follower read + // could get a newer value, and after that, the leader may read a stale value, + // which violates linearizability. + self.get_store().applied_index() >= read_index && !self.is_splitting() && !self.is_merging() + } + #[inline] fn is_splitting(&self) -> bool { self.last_committed_split_idx > self.get_store().applied_index() @@ -1354,18 +1362,24 @@ impl Peer { && read.cmds[0].0.get_requests().len() == 1 && read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex; - if !is_read_index_request { - let term = self.term(); - // Only read index request is valid. - for (_, cb) in read.cmds.drain(..) { - apply::notify_stale_req(term, cb); - } - } else { + let term = self.term(); + if is_read_index_request { for (req, cb) in read.cmds.drain(..) { cb.invoke_read(self.handle_read(ctx, req, true, read.read_index)); } + self.pending_reads.ready_cnt -= 1; + } else if self.ready_to_handle_unsafe_follower_read(read.read_index.unwrap()) { + for (req, cb) in read.cmds.drain(..) { + if req.get_header().get_follower_read() { + cb.invoke_read(self.handle_read(ctx, req, true, read.read_index)); + } else { + apply::notify_stale_req(term, cb); + } + } + self.pending_reads.ready_cnt -= 1; + } else { + self.pending_reads.reads.push_front(read); } - self.pending_reads.ready_cnt -= 1; } } } diff --git a/src/storage/kv/raftkv.rs b/src/storage/kv/raftkv.rs index 9d40deba16e..b6c34654d01 100644 --- a/src/storage/kv/raftkv.rs +++ b/src/storage/kv/raftkv.rs @@ -171,6 +171,7 @@ impl RaftKv { header.set_term(ctx.get_term()); } header.set_sync_log(ctx.get_sync_log()); + header.set_follower_read(ctx.get_follower_read()); header } diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index 40dd7eed18a..892601cd4d5 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -3,6 +3,7 @@ mod test_bootstrap; mod test_conf_change; mod test_coprocessor; +mod test_follower_read; mod test_merge; mod test_pending_peers; mod test_snap; diff --git a/tests/failpoints/cases/test_follower_read.rs b/tests/failpoints/cases/test_follower_read.rs new file mode 100644 index 00000000000..e3e7bb7643e --- /dev/null +++ b/tests/failpoints/cases/test_follower_read.rs @@ -0,0 +1,65 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use fail; +use std::time::Duration; +use test_raftstore::*; +use tikv_util::HandyRwLock; + +#[test] +fn test_wait_for_apply_index() { + let _guard = crate::setup(); + let mut cluster = new_server_cluster(0, 3); + + // Increase the election tick to make this test case running reliably. + configure_for_lease_read(&mut cluster, Some(50), Some(10_000)); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k0", b"v0"); + let p2 = new_peer(2, 2); + cluster.pd_client.must_add_peer(r1, p2.clone()); + let p3 = new_peer(3, 3); + cluster.pd_client.must_add_peer(r1, p3.clone()); + must_get_equal(&cluster.get_engine(3), b"k0", b"v0"); + + let region = cluster.get_region(b"k0"); + cluster.must_transfer_leader(region.get_id(), p2.clone()); + + // Block all write cmd applying of Peer 3. + fail::cfg("on_apply_write_cmd", "sleep(2000)").unwrap(); + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + + // Peer 3 does not apply the cmd of putting 'k1' right now, then the follower read must + // be blocked. + must_get_none(&cluster.get_engine(3), b"k1"); + let mut request = new_request( + region.get_id(), + region.get_region_epoch().clone(), + vec![new_get_cf_cmd("default", b"k1")], + false, + ); + request.mut_header().set_peer(p3.clone()); + request.mut_header().set_follower_read(true); + let (cb, rx) = make_cb(&request); + cluster + .sim + .rl() + .async_command_on_node(3, request, cb) + .unwrap(); + // Must timeout here + assert!(rx.recv_timeout(Duration::from_millis(500)).is_err()); + fail::cfg("on_apply_write_cmd", "off").unwrap(); + + // After write cmd applied, the follower read will be executed. + match rx.recv_timeout(Duration::from_secs(3)) { + Ok(resp) => { + assert_eq!(resp.get_responses().len(), 1); + assert_eq!(resp.get_responses()[0].get_get().get_value(), b"v1"); + } + Err(_) => panic!("follower read failed"), + } +} diff --git a/tests/integrations/storage/test_raftkv.rs b/tests/integrations/storage/test_raftkv.rs index 378e61caa72..8713fd66550 100644 --- a/tests/integrations/storage/test_raftkv.rs +++ b/tests/integrations/storage/test_raftkv.rs @@ -4,6 +4,8 @@ use kvproto::kvrpcpb::Context; use engine::IterOption; use engine::{CfName, CF_DEFAULT}; +use std::thread; +use std::time; use test_raftstore::*; use tikv::storage::kv::*; use tikv::storage::{CFStatistics, Key}; @@ -123,6 +125,66 @@ fn test_read_index_on_replica() { ); } +#[test] +fn test_read_on_replica() { + let count = 3; + let mut cluster = new_server_cluster(0, count); + cluster.run(); + + let k1 = b"k1"; + let (k2, v2) = (b"k2", b"v2"); + let (k3, v3) = (b"k3", b"v3"); + let (k4, v4) = (b"k4", b"v4"); + + // make sure leader has been elected. + assert_eq!(cluster.must_get(k1), None); + + let region = cluster.get_region(b""); + let leader = cluster.leader_of_region(region.get_id()).unwrap(); + let leader_storage = cluster.sim.rl().storages[&leader.get_id()].clone(); + + let mut leader_ctx = Context::new(); + leader_ctx.set_region_id(region.get_id()); + leader_ctx.set_region_epoch(region.get_region_epoch().clone()); + leader_ctx.set_peer(leader.clone()); + + // write some data + let peers = region.get_peers(); + assert_none(&leader_ctx, &leader_storage, k2); + must_put(&leader_ctx, &leader_storage, k2, v2); + + // read on follower + let mut follower_peer = None; + let mut follower_id = 0; + for p in peers { + if p.get_id() != leader.get_id() { + follower_id = p.get_id(); + follower_peer = Some(p.clone()); + break; + } + } + + assert!(follower_peer.is_some()); + let mut follower_ctx = Context::new(); + follower_ctx.set_region_id(region.get_id()); + follower_ctx.set_region_epoch(region.get_region_epoch().clone()); + follower_ctx.set_peer(follower_peer.as_ref().unwrap().clone()); + follower_ctx.set_follower_read(true); + let follower_storage = cluster.sim.rl().storages[&follower_id].clone(); + assert_has(&follower_ctx, &follower_storage, k2, v2); + + must_put(&leader_ctx, &leader_storage, k3, v3); + assert_has(&follower_ctx, &follower_storage, k3, v3); + + cluster.stop_node(follower_id); + must_put(&leader_ctx, &leader_storage, k4, v4); + cluster.run_node(follower_id).unwrap(); + let follower_storage = cluster.sim.rl().storages[&follower_id].clone(); + // sleep to ensure the follower has received a heartbeat from the leader + thread::sleep(time::Duration::from_millis(300)); + assert_has(&follower_ctx, &follower_storage, k4, v4); +} + fn must_put(ctx: &Context, engine: &E, key: &[u8], value: &[u8]) { engine.put(ctx, Key::from_raw(key), value.to_vec()).unwrap(); }