Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support follower read #5051

Merged
merged 14 commits into from
Jul 17, 2019
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ codec = { path = "components/codec" }
tipb_helper = { path = "components/tipb_helper" }
tipb = { git = "https://github.com/pingcap/tipb.git" }
# TODO: use master branch after the next version is released.
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
log_wrappers = { path = "components/log_wrappers" }
engine = { path = "components/engine" }
tikv_util = { path = "components/tikv_util" }
Expand Down
2 changes: 1 addition & 1 deletion components/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ portable = ["engine_rocksdb/portable"]
sse = ["engine_rocksdb/sse"]

[dependencies]
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
protobuf = "~2.0"
raft = "0.4"
quick-error = "1.2.2"
Expand Down
2 changes: 1 addition & 1 deletion components/log_wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ publish = false
slog = "2.3"
slog-term = "2.4"
hex = "0.3"
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
tikv_alloc = { path = "../tikv_alloc", default-features = false }
2 changes: 1 addition & 1 deletion components/test_coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
tikv_util = { path = "../tikv_util" }
tikv = { path = "../../", default-features = false }
tipb = { git = "https://github.com/pingcap/tipb.git" }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
protobuf = "~2.0"
test_storage = { path = "../test_storage" }
futures = "0.1"
2 changes: 1 addition & 1 deletion components/test_raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false
[dependencies]
tikv_util = { path = "../tikv_util" }
tikv = { path = "../../", default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
protobuf = "~2.0"
tempfile = "3.0"
raft = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion components/test_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ publish = false
tikv_util = { path = "../tikv_util" }
tikv = { path = "../../", default-features = false }
test_raftstore = { path = "../test_raftstore" }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.0" }
kvproto = { git = "https://github.com/5kbpers/kvproto.git", branch = "support-follower-read" }
futures = "0.1"
10 changes: 9 additions & 1 deletion src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2254,7 +2254,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
// ReadIndex can be processed on the replicas.
let is_read_index_request =
request.len() == 1 && request[0].get_cmd_type() == CmdType::ReadIndex;
if !(self.fsm.peer.is_leader() || is_read_index_request) {
let mut read_only = true;
for r in msg.get_requests() {
match r.get_cmd_type() {
CmdType::Get | CmdType::Snap | CmdType::ReadIndex => (),
_ => read_only = false,
}
}
let allow_follower_read = read_only && msg.get_header().get_follower_read();
if !(self.fsm.peer.is_leader() || is_read_index_request || allow_follower_read) {
self.ctx.raft_metrics.invalid_proposal.not_leader += 1;
let leader = self.fsm.peer.get_peer_from_cache(leader_id);
self.fsm.group_state = GroupState::Chaos;
Expand Down
28 changes: 20 additions & 8 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,13 @@ impl Peer {
&& !self.is_merging()
}

fn ready_to_handle_follower_read(&self, read_index: u64) -> bool {
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
// Wait until the follower applies all values before the read. There is still a
// problem if the leader applies fewer values than the follower, the follower read
// could get a newer value.
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
self.get_store().applied_index() >= read_index && !self.is_splitting() && !self.is_merging()
}

#[inline]
fn is_splitting(&self) -> bool {
self.last_committed_split_idx > self.get_store().applied_index()
Expand Down Expand Up @@ -1354,18 +1361,23 @@ impl Peer {
&& read.cmds[0].0.get_requests().len() == 1
&& read.cmds[0].0.get_requests()[0].get_cmd_type() == CmdType::ReadIndex;

if !is_read_index_request {
let term = self.term();
// Only read index request is valid.
for (_, cb) in read.cmds.drain(..) {
apply::notify_stale_req(term, cb);
}
} else {
let term = self.term();
if is_read_index_request {
5kbpers marked this conversation as resolved.
Show resolved Hide resolved
for (req, cb) in read.cmds.drain(..) {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
}
} else if self.ready_to_handle_follower_read(read.read_index.unwrap()) {
for (req, cb) in read.cmds.drain(..) {
if req.get_header().get_follower_read() {
cb.invoke_read(self.handle_read(ctx, req, true, read.read_index));
} else {
apply::notify_stale_req(term, cb);
}
}
self.pending_reads.ready_cnt -= 1;
} else {
self.pending_reads.reads.push_front(read);
}
self.pending_reads.ready_cnt -= 1;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/kv/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<S: RaftStoreRouter> RaftKv<S> {
header.set_term(ctx.get_term());
}
header.set_sync_log(ctx.get_sync_log());
header.set_follower_read(ctx.get_follower_read());
header
}

Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod test_bootstrap;
mod test_conf_change;
mod test_coprocessor;
mod test_follower_read;
mod test_merge;
mod test_pending_peers;
mod test_snap;
Expand Down
40 changes: 40 additions & 0 deletions tests/failpoints/cases/test_follower_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::sync::Arc;
5kbpers marked this conversation as resolved.
Show resolved Hide resolved

use fail;

use test_raftstore::*;

#[test]
fn test_wait_for_apply_index() {
let mut cluster = new_node_cluster(0, 3);
5kbpers marked this conversation as resolved.
Show resolved Hide resolved

// Increase the election tick to make this test case running reliably.
configure_for_lease_read(&mut cluster, Some(50), Some(10_000));
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();
cluster.must_put(b"k0", b"v0");
let p2 = new_peer(2, 2);
cluster.pd_client.must_add_peer(r1, p2.clone());
let p3 = new_peer(3, 3);
cluster.pd_client.must_add_peer(r1, p3.clone());
must_get_equal(&cluster.get_engine(3), b"k0", b"v0");

let r1 = cluster.get_region(b"k1");
// Ensure peer 3 is not leader to use fail point 'on_raft_ready'
if cluster.leader_of_region(r1.get_id()) == Some(p3) {
cluster.must_transfer_leader(r1.get_id(), p2);
}

fail::cfg("on_raft_ready", "pause").unwrap();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k2", b"v2");
cluster.must_put(b"k3", b"v3");
cluster.must_put(b"k4", b"v4");
cluster.must_put(b"k5", b"v5");
fail::cfg("on_raft_ready", "off").unwrap();
// Peer 3 does not apply the value of 'k5' right now
must_get_none(&cluster.get_engine(3), b"k5");
assert_eq!(cluster.must_get(b"k5").unwrap(), b"v5");
}
62 changes: 62 additions & 0 deletions tests/integrations/storage/test_raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use kvproto::kvrpcpb::Context;

use engine::IterOption;
use engine::{CfName, CF_DEFAULT};
use std::thread;
use std::time;
use test_raftstore::*;
use tikv::storage::kv::*;
use tikv::storage::{CFStatistics, Key};
Expand Down Expand Up @@ -123,6 +125,66 @@ fn test_read_index_on_replica() {
);
}

#[test]
fn test_read_on_replica() {
let count = 3;
let mut cluster = new_server_cluster(0, count);
cluster.run();

let k1 = b"k1";
let (k2, v2) = (b"k2", b"v2");
let (k3, v3) = (b"k3", b"v3");
let (k4, v4) = (b"k4", b"v4");

// make sure leader has been elected.
assert_eq!(cluster.must_get(k1), None);

let region = cluster.get_region(b"");
let leader = cluster.leader_of_region(region.get_id()).unwrap();
let leader_storage = cluster.sim.rl().storages[&leader.get_id()].clone();

let mut leader_ctx = Context::new();
leader_ctx.set_region_id(region.get_id());
leader_ctx.set_region_epoch(region.get_region_epoch().clone());
leader_ctx.set_peer(leader.clone());

// write some data
let peers = region.get_peers();
assert_none(&leader_ctx, &leader_storage, k2);
must_put(&leader_ctx, &leader_storage, k2, v2);

// read on follower
let mut follower_peer = None;
let mut follower_id = 0;
for p in peers {
if p.get_id() != leader.get_id() {
follower_id = p.get_id();
follower_peer = Some(p.clone());
break;
}
}

assert!(follower_peer.is_some());
let mut follower_ctx = Context::new();
follower_ctx.set_region_id(region.get_id());
follower_ctx.set_region_epoch(region.get_region_epoch().clone());
follower_ctx.set_peer(follower_peer.as_ref().unwrap().clone());
follower_ctx.set_follower_read(true);
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
assert_has(&follower_ctx, &follower_storage, k2, v2);

must_put(&leader_ctx, &leader_storage, k3, v3);
assert_has(&follower_ctx, &follower_storage, k3, v3);

cluster.stop_node(follower_id);
must_put(&leader_ctx, &leader_storage, k4, v4);
cluster.run_node(follower_id).unwrap();
let follower_storage = cluster.sim.rl().storages[&follower_id].clone();
// sleep to ensure the follower has received a heartbeat from the leader
thread::sleep(time::Duration::from_millis(300));
assert_has(&follower_ctx, &follower_storage, k4, v4);
}

fn must_put<E: Engine>(ctx: &Context, engine: &E, key: &[u8], value: &[u8]) {
engine.put(ctx, Key::from_raw(key), value.to_vec()).unwrap();
}
Expand Down