Skip to content

Commit

Permalink
support follower read (tikv#5051)
Browse files Browse the repository at this point in the history
* raftstore,server: support follower read

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* raftstore: fix condition check of read on replica

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* raftstore: follower read waits for apply index reaches read index

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* add a test of waiting for read index

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix test_wait_for_apply_index

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* dec pending reads count after follower handle read index cmd

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update comments

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* remove unused file

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix test_wait_for_apply_index

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update comments

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update test_wait_for_apply_index

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update dependency 'kvproto'

Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored Jul 17, 2019
1 parent 7a2b0de commit 8338936
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,10 @@ impl ApplyDelegate {
ctx: &ApplyContext,
req: &RaftCmdRequest,
) -> Result<(RaftCmdResponse, ApplyResult)> {
fail_point!("on_apply_write_cmd", self.id() == 3, |_| {
unimplemented!();
});

let requests = req.get_requests();
let mut responses = Vec::with_capacity(requests.len());

Expand Down
10 changes: 9 additions & 1 deletion src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2254,7 +2254,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
// ReadIndex can be processed on the replicas.
let is_read_index_request =
request.len() == 1 && request[0].get_cmd_type() == CmdType::ReadIndex;
if !(self.fsm.peer.is_leader() || is_read_index_request) {
let mut read_only = true;
for r in msg.get_requests() {
match r.get_cmd_type() {
CmdType::Get | CmdType::Snap | CmdType::ReadIndex => (),
_ => read_only = false,
}
}
let allow_follower_read = read_only && msg.get_header().get_follower_read();
if !(self.fsm.peer.is_leader() || is_read_index_request || allow_follower_read) {
self.ctx.raft_metrics.invalid_proposal.not_leader += 1;
let leader = self.fsm.peer.get_peer_from_cache(leader_id);
self.fsm.group_state = GroupState::Chaos;
Expand Down
30 changes: 22 additions & 8 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,14 @@ impl Peer {
&& !self.is_merging()
}

fn ready_to_handle_unsafe_follower_read(&self, read_index: u64) -> bool {
// Wait until the follower applies all values before the read. There is still a
// problem if the leader applies fewer values than the follower, the follower read
// could get a newer value, and after that, the leader may read a stale value,
// which violates linearizability.
self.get_store().applied_index() >= read_index && !self.is_splitting() && !self.is_merging()
}

#[inline]
fn is_splitting(&self) -> bool {
self.last_committed_split_idx > self.get_store().applied_index()
Expand Down Expand Up @@ -1354,18 +1362,24 @@ impl Peer {
&& read.cmds[0].0.get_requests().len() == 1
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;

if !is_read_index_request {
let term = self.term();
// Only read index request is valid.
for (_, cb) in read.cmds.drain(..) {
apply::notify_stale_req(term, cb);
}
} else {
let term = self.term();
if is_read_index_request {
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
}
self.pending_reads.ready_cnt -= 1;
} else if self.ready_to_handle_unsafe_follower_read(read.read_index.unwrap()) {
for (req, cb) in read.cmds.drain(..) {
if req.get_header().get_follower_read() {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
} else {
apply::notify_stale_req(term, cb);
}
}
self.pending_reads.ready_cnt -= 1;
} else {
self.pending_reads.reads.push_front(read);
}
self.pending_reads.ready_cnt -= 1;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/kv/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<S: RaftStoreRouter> RaftKv<S> {
header.set_term(ctx.get_term());
}
header.set_sync_log(ctx.get_sync_log());
header.set_follower_read(ctx.get_follower_read());
header
}

Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod test_bootstrap;
mod test_conf_change;
mod test_coprocessor;
mod test_follower_read;
mod test_merge;
mod test_pending_peers;
mod test_snap;
Expand Down
65 changes: 65 additions & 0 deletions tests/failpoints/cases/test_follower_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use fail;
use std::time::Duration;
use test_raftstore::*;
use tikv_util::HandyRwLock;

#[test]
fn test_wait_for_apply_index() {
let _guard = crate::setup();
let mut cluster = new_server_cluster(0, 3);

// Increase the election tick to make this test case running reliably.
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
cluster.must_put(b"k0", b"v0");
let p2 = new_peer(2, 2);
cluster.pd_client.must_add_peer(r1, p2.clone());
let p3 = new_peer(3, 3);
cluster.pd_client.must_add_peer(r1, p3.clone());
must_get_equal(&cluster.get_engine(3), b"k0", b"v0");

let region = cluster.get_region(b"k0");
cluster.must_transfer_leader(region.get_id(), p2.clone());

// Block all write cmd applying of Peer 3.
fail::cfg("on_apply_write_cmd", "sleep(2000)").unwrap();
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");

// Peer 3 does not apply the cmd of putting 'k1' right now, then the follower read must
// be blocked.
must_get_none(&cluster.get_engine(3), b"k1");
let mut request = new_request(
region.get_id(),
region.get_region_epoch().clone(),
vec![new_get_cf_cmd("default", b"k1")],
false,
);
request.mut_header().set_peer(p3.clone());
request.mut_header().set_follower_read(true);
let (cb, rx) = make_cb(&request);
cluster
.sim
.rl()
.async_command_on_node(3, request, cb)
.unwrap();
// Must timeout here
assert!(rx.recv_timeout(Duration::from_millis(500)).is_err());
fail::cfg("on_apply_write_cmd", "off").unwrap();

// After write cmd applied, the follower read will be executed.
match rx.recv_timeout(Duration::from_secs(3)) {
Ok(resp) => {
assert_eq!(resp.get_responses().len(), 1);
assert_eq!(resp.get_responses()[0].get_get().get_value(), b"v1");
}
Err(_) => panic!("follower read failed"),
}
}
62 changes: 62 additions & 0 deletions tests/integrations/storage/test_raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use kvproto::kvrpcpb::Context;

use engine::IterOption;
use engine::{CfName, CF_DEFAULT};
use std::thread;
use std::time;
use test_raftstore::*;
use tikv::storage::kv::*;
use tikv::storage::{CFStatistics, Key};
Expand Down Expand Up @@ -123,6 +125,66 @@ fn test_read_index_on_replica() {
);
}

#[test]
fn test_read_on_replica() {
let count = 3;
let mut cluster = new_server_cluster(0, count);
cluster.run();

let k1 = b"k1";
let (k2, v2) = (b"k2", b"v2");
let (k3, v3) = (b"k3", b"v3");
let (k4, v4) = (b"k4", b"v4");

// make sure leader has been elected.
assert_eq!(cluster.must_get(k1), None);

let region = cluster.get_region(b"");
let leader = cluster.leader_of_region(region.get_id()).unwrap();
let leader_storage = cluster.sim.rl().storages[&leader.get_id()].clone();

let mut leader_ctx = Context::new();
leader_ctx.set_region_id(region.get_id());
leader_ctx.set_region_epoch(region.get_region_epoch().clone());
leader_ctx.set_peer(leader.clone());

// write some data
let peers = region.get_peers();
assert_none(&leader_ctx, &leader_storage, k2);
must_put(&leader_ctx, &leader_storage, k2, v2);

// read on follower
let mut follower_peer = None;
let mut follower_id = 0;
for p in peers {
if p.get_id() != leader.get_id() {
follower_id = p.get_id();
follower_peer = Some(p.clone());
break;
}
}

assert!(follower_peer.is_some());
let mut follower_ctx = Context::new();
follower_ctx.set_region_id(region.get_id());
follower_ctx.set_region_epoch(region.get_region_epoch().clone());
follower_ctx.set_peer(follower_peer.as_ref().unwrap().clone());
follower_ctx.set_follower_read(true);
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
assert_has(&follower_ctx, &follower_storage, k2, v2);

must_put(&leader_ctx, &leader_storage, k3, v3);
assert_has(&follower_ctx, &follower_storage, k3, v3);

cluster.stop_node(follower_id);
must_put(&leader_ctx, &leader_storage, k4, v4);
cluster.run_node(follower_id).unwrap();
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
// sleep to ensure the follower has received a heartbeat from the leader
thread::sleep(time::Duration::from_millis(300));
assert_has(&follower_ctx, &follower_storage, k4, v4);
}

fn must_put<E: Engine>(ctx: &Context, engine: &E, key: &[u8], value: &[u8]) {
engine.put(ctx, Key::from_raw(key), value.to_vec()).unwrap();
}
Expand Down

0 comments on commit 8338936

Please sign in to comment.