Skip to content

Commit

Permalink
support follower read (tikv#5051)
Browse files Browse the repository at this point in the history
* raftstore,server: support follower read
  • Loading branch information
5kbpers authored and chenfu committed Mar 10, 2020
1 parent afec18e commit 7b8749c
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 13 deletions.
4 changes: 4 additions & 0 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,10 @@ impl ApplyDelegate {
ctx: &ApplyContext,
req: &RaftCmdRequest,
) -> Result<(RaftCmdResponse, ApplyResult)> {
fail_point!("on_apply_write_cmd", self.id() == 3, |_| {
unimplemented!();
});

let requests = req.get_requests();
let mut responses = Vec::with_capacity(requests.len());

Expand Down
10 changes: 9 additions & 1 deletion src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2287,7 +2287,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
// ReadIndex can be processed on the replicas.
let is_read_index_request =
request.len() == 1 && request[0].get_cmd_type() == CmdType::ReadIndex;
if !(self.fsm.peer.is_leader() || is_read_index_request) {
let mut read_only = true;
for r in msg.get_requests() {
match r.get_cmd_type() {
CmdType::Get | CmdType::Snap | CmdType::ReadIndex => (),
_ => read_only = false,
}
}
let allow_replica_read = read_only && msg.get_header().get_replica_read();
if !(self.fsm.peer.is_leader() || is_read_index_request || allow_replica_read) {
self.ctx.raft_metrics.invalid_proposal.not_leader += 1;
let leader = self.fsm.peer.get_peer_from_cache(leader_id);
self.fsm.group_state = GroupState::Chaos;
Expand Down
30 changes: 22 additions & 8 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,14 @@ impl Peer {
&& !self.is_merging()
}

fn ready_to_handle_unsafe_replica_read(&self, read_index: u64) -> bool {
// Wait until the follower applies all values before the read. There is still a
// problem if the leader applies fewer values than the follower, the follower read
// could get a newer value, and after that, the leader may read a stale value,
// which violates linearizability.
self.get_store().applied_index() >= read_index && !self.is_splitting() && !self.is_merging()
}

#[inline]
fn is_splitting(&self) -> bool {
self.last_committed_split_idx > self.get_store().applied_index()
Expand Down Expand Up @@ -1357,18 +1365,24 @@ impl Peer {
&& read.cmds[0].0.get_requests().len() == 1
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;

if !is_read_index_request {
let term = self.term();
// Only read index request is valid.
for (_, cb) in read.cmds.drain(..) {
apply::notify_stale_req(term, cb);
}
} else {
let term = self.term();
if is_read_index_request {
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
}
self.pending_reads.ready_cnt -= 1;
} else if self.ready_to_handle_unsafe_replica_read(read.read_index.unwrap()) {
for (req, cb) in read.cmds.drain(..) {
if req.get_header().get_replica_read() {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
} else {
apply::notify_stale_req(term, cb);
}
}
self.pending_reads.ready_cnt -= 1;
} else {
self.pending_reads.reads.push_front(read);
}
self.pending_reads.ready_cnt -= 1;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/kv/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<S: RaftStoreRouter> RaftKv<S> {
header.set_term(ctx.get_term());
}
header.set_sync_log(ctx.get_sync_log());
header.set_replica_read(ctx.get_replica_read());
header
}

Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod test_conf_change;
mod test_coprocessor;
mod test_merge;
mod test_pending_peers;
mod test_replica_read;
mod test_snap;
mod test_split_region;
mod test_stale_peer;
Expand Down
7 changes: 3 additions & 4 deletions tests/failpoints/cases/test_replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
use std::sync::Arc;

use fail;
use std::mem;
use std::sync::atomic::AtomicBool;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use test_raftstore::*;
use tikv_util::HandyRwLock;
Expand Down Expand Up @@ -67,6 +63,7 @@ fn test_wait_for_apply_index() {
Err(_) => panic!("follower read failed"),
}
}
<<<<<<< HEAD

#[test]
fn test_duplicate_read_index_ctx() {
Expand Down Expand Up @@ -139,3 +136,5 @@ fn test_duplicate_read_index_ctx() {
assert!(rx2.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(rx3.recv_timeout(Duration::from_millis(500)).is_ok());
}
=======
>>>>>>> c759022b... Rename follower read (#5118)
61 changes: 61 additions & 0 deletions tests/integrations/storage/test_raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use raft::eraftpb::MessageType;

use engine::IterOption;
use engine::{CfName, CF_DEFAULT};
use std::thread;
use std::time;
use test_raftstore::*;
use tikv::storage::kv::*;
use tikv::storage::{CFStatistics, Key};
Expand Down Expand Up @@ -192,6 +194,65 @@ fn test_invaild_read_index_when_no_leader() {
.contains("can not read index due to no leader"));
}

fn test_read_on_replica() {
let count = 3;
let mut cluster = new_server_cluster(0, count);
cluster.run();

let k1 = b"k1";
let (k2, v2) = (b"k2", b"v2");
let (k3, v3) = (b"k3", b"v3");
let (k4, v4) = (b"k4", b"v4");

// make sure leader has been elected.
assert_eq!(cluster.must_get(k1), None);

let region = cluster.get_region(b"");
let leader = cluster.leader_of_region(region.get_id()).unwrap();
let leader_storage = cluster.sim.rl().storages[&leader.get_id()].clone();

let mut leader_ctx = Context::new();
leader_ctx.set_region_id(region.get_id());
leader_ctx.set_region_epoch(region.get_region_epoch().clone());
leader_ctx.set_peer(leader.clone());

// write some data
let peers = region.get_peers();
assert_none(&leader_ctx, &leader_storage, k2);
must_put(&leader_ctx, &leader_storage, k2, v2);

// read on follower
let mut follower_peer = None;
let mut follower_id = 0;
for p in peers {
if p.get_id() != leader.get_id() {
follower_id = p.get_id();
follower_peer = Some(p.clone());
break;
}
}

assert!(follower_peer.is_some());
let mut follower_ctx = Context::new();
follower_ctx.set_region_id(region.get_id());
follower_ctx.set_region_epoch(region.get_region_epoch().clone());
follower_ctx.set_peer(follower_peer.as_ref().unwrap().clone());
follower_ctx.set_replica_read(true);
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
assert_has(&follower_ctx, &follower_storage, k2, v2);

must_put(&leader_ctx, &leader_storage, k3, v3);
assert_has(&follower_ctx, &follower_storage, k3, v3);

cluster.stop_node(follower_id);
must_put(&leader_ctx, &leader_storage, k4, v4);
cluster.run_node(follower_id).unwrap();
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
// sleep to ensure the follower has received a heartbeat from the leader
thread::sleep(time::Duration::from_millis(300));
assert_has(&follower_ctx, &follower_storage, k4, v4);
}

fn must_put<E: Engine>(ctx: &Context, engine: &E, key: &[u8], value: &[u8]) {
engine.put(ctx, Key::from_raw(key), value.to_vec()).unwrap();
}
Expand Down

0 comments on commit 7b8749c

Please sign in to comment.