Skip to content

Commit

Permalink
*: add protection against unlimited log growth(#131) (#398)
Browse files Browse the repository at this point in the history
- Add protection against uncommitted log growth of leader.
  This protection is done by estimate size of entry data
  rather than count of entry. So entry with no data will
  never be dropped
- Add tests for raft and raw_node

Signed-off-by: lyzongyuan <lyzongyuan@gmail.com>
  • Loading branch information
c0x0o authored Oct 26, 2020
1 parent 6998aed commit a424fdb
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 19 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ There is a lot of work to do to get there, and we're very excited for you to con
4. **Prepare it.** Groom your code before you send a PR. The CI will verify you did all of this.
* Run `cargo test --all`, make sure they all pass.
* Run `cargo bench --all -- --test`, make sure they all pass.
* Run `cargo clippy -all -- -D all`, fix any lints.
* Run `cargo clippy --all --all-targets -- -D clippy::all`, fix any lints.
* Run `cargo fmt --all`.
5. **Submit the Pull Request.**
* Make sure to link to the issue you are addressing. If there is no related issue, please describe what the PR resolves.
Expand Down
2 changes: 1 addition & 1 deletion benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
node.raft.raft_log.store.wl().append(&entries).expect("");
node.raft.raft_log.unstable.offset = 102;
// This increases 'committed_index' to `last_index` because there is only one node in quorum.
node.raft.append_entry(&mut unstable_entries);
let _ = node.raft.append_entry(&mut unstable_entries);

let mut snap = Snapshot::default();
snap.set_data(vec![0; 8 * 1024 * 1024]);
Expand Down
168 changes: 162 additions & 6 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2682,7 +2682,7 @@ fn test_bcast_beat() {
sm.become_candidate();
sm.become_leader();
for i in 0..10 {
sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
let _ = sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
}
// slow follower
let mut_pr = |sm: &mut Interface, n, matched, next_idx| {
Expand Down Expand Up @@ -2831,7 +2831,7 @@ fn test_send_append_for_progress_probe() {
// we expect that raft will only send out one msgAPP on the first
// loop. After that, the follower is paused until a heartbeat response is
// received.
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
let _ = r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
let msg = r.read_messages();
assert_eq!(msg.len(), 1);
Expand All @@ -2840,7 +2840,7 @@ fn test_send_append_for_progress_probe() {

assert!(r.prs().get(2).unwrap().paused);
for _ in 0..10 {
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
let _ = r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 0);
}
Expand Down Expand Up @@ -2877,7 +2877,7 @@ fn test_send_append_for_progress_replicate() {
r.mut_prs().get_mut(2).unwrap().become_replicate();

for _ in 0..10 {
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
let _ = r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 1);
}
Expand All @@ -2893,7 +2893,7 @@ fn test_send_append_for_progress_snapshot() {
r.mut_prs().get_mut(2).unwrap().become_snapshot(10);

for _ in 0..10 {
r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
let _ = r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r.send_append(2);
assert_eq!(r.read_messages().len(), 0);
}
Expand Down Expand Up @@ -3126,7 +3126,7 @@ fn test_new_leader_pending_config() {
let mut e = Entry::default();
if add_entry {
e.set_entry_type(EntryType::EntryNormal);
r.append_entry(&mut [e]);
let _ = r.append_entry(&mut [e]);
}
r.become_candidate();
r.become_leader();
Expand Down Expand Up @@ -5075,3 +5075,159 @@ fn test_read_when_quorum_becomes_less() {
.unwrap();
assert!(!network.peers[&1].read_states.is_empty());
}

#[test]
fn test_uncommitted_entries_size_limit() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let mut nt = Network::new_with_config(vec![None, None, None], config, &l);
let data = b"hello world!".to_vec();
let mut entry = Entry::default();
entry.data = data.to_vec();
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// should return ok
let result = nt.dispatch(vec![msg.clone()].to_vec());
assert!(result.is_ok());

// then next proposal should be dropped
let result = nt.dispatch(vec![msg].to_vec());
assert!(!result.is_ok());
assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped);

// but entry with empty size should be accepted
let entry = Entry::default();
let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![empty_msg].to_vec());
assert!(result.is_ok());

// after reduce, new proposal should be accecpted
let mut entry = Entry::default();
entry.data = data;
entry.index = 3;
nt.peers
.get_mut(&1)
.unwrap()
.reduce_uncommitted_size(&[entry]);
assert_eq!(nt.peers.get_mut(&1).unwrap().uncommitted_size(), 0);

// a huge proposal should be accepted when there is no uncommitted entry,
// even it's bigger than max_uncommitted_size
let mut entry = Entry::default();
entry.data = b"hello world and raft".to_vec();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![long_msg].to_vec());
assert!(result.is_ok());

// but another huge one will be dropped
let mut entry = Entry::default();
entry.data = b"hello world and raft".to_vec();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![long_msg].to_vec());
assert!(!result.is_ok());

// entry with empty size should still be accepted
let entry = Entry::default();
let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
let result = nt.dispatch(vec![empty_msg].to_vec());
assert!(result.is_ok());
}

#[test]
fn test_uncommitted_entry_after_leader_election() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let mut nt = Network::new_with_config(vec![None, None, None, None, None], config, &l);
let data = b"hello world!".to_vec();
let mut entry = Entry::default();
entry.data = data;
let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// create a uncommitted entry on node2
nt.cut(1, 3);
nt.cut(1, 4);
nt.cut(1, 5);
nt.send(vec![msg]);

// now isolate master and make node2 as master
nt.isolate(1);
// ignore message append, cluster only work on election
nt.ignore(MessageType::MsgAppend);
nt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]);

// uncommitted log size should be 0 on node2,
// because we set uncommitted size to 0 rather than re-computing it,
// which means max_uncommitted_size is a soft limit
assert_eq!(nt.peers.get_mut(&2).unwrap().state, raft::StateRole::Leader);
assert_eq!(nt.peers.get_mut(&2).unwrap().uncommitted_size(), 0);
}

#[test]
fn test_uncommitted_state_advance_ready_from_last_term() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let mut nt = Network::new_with_config(vec![None, None, None, None, None], config, &l);

let data = b"hello world!".to_vec();
let mut ent = Entry::default();
ent.data = data.clone();

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

nt.send(vec![new_message_with_entries(
1,
1,
MessageType::MsgPropose,
vec![ent.clone()],
)]);
nt.send(vec![new_message_with_entries(
1,
1,
MessageType::MsgPropose,
vec![ent.clone()],
)]);

// now node2 has 2 committed entries
// make node2 leader
nt.send(vec![new_message(2, 2, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers.get_mut(&2).unwrap().state, raft::StateRole::Leader);

nt.isolate(2);
// create one uncommitted entry
nt.send(vec![new_message_with_entries(
2,
2,
MessageType::MsgPropose,
vec![ent.clone()],
)]);

let mut ent1 = ent.clone();
ent1.index = 1;
let mut ent2 = ent;
ent2.index = 2;

// simulate advance 2 entries when node2 is follower
nt.peers
.get_mut(&2)
.unwrap()
.reduce_uncommitted_size(&[ent1, ent2]);

// uncommitted size should be 12(remain unchanged since there's only one uncommitted entries)
assert_eq!(nt.peers.get_mut(&2).unwrap().uncommitted_size(), data.len());
}
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ fn test_leader_bcast_beat() {
r.become_candidate();
r.become_leader();
for i in 0..10 {
r.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
let _ = r.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
}

for _ in 0..hi {
Expand Down
65 changes: 62 additions & 3 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ fn must_cmp_ready(
fn new_raw_node(
id: u64,
peers: Vec<u64>,
election: usize,
heartbeat: usize,
election_tick: usize,
heartbeat_tick: usize,
storage: MemStorage,
logger: &Logger,
) -> RawNode<MemStorage> {
let config = new_test_config(id, election_tick, heartbeat_tick);
new_raw_node_with_config(peers, &config, storage, logger)
}

fn new_raw_node_with_config(
peers: Vec<u64>,
config: &Config,
storage: MemStorage,
logger: &Logger,
) -> RawNode<MemStorage> {
let config = new_test_config(id, election, heartbeat);
if storage.initial_state().unwrap().initialized() && peers.is_empty() {
panic!("new_raw_node with empty peers on initialized store");
}
Expand Down Expand Up @@ -728,3 +737,53 @@ fn test_set_priority() {
assert_eq!(raw_node.raft.priority, p);
}
}

// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
// partitioned from a quorum of nodes. It verifies that the leader's log is
// protected from unbounded growth even as new entries continue to be proposed.
// This protection is provided by the max_uncommitted_size configuration.
#[test]
fn test_bounded_uncommitted_entries_growth_with_partition() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let s = new_storage();
let mut raw_node = new_raw_node_with_config(vec![1], config, s.clone(), &l);

// wait raw_node to be leader
raw_node.campaign().unwrap();
loop {
let rd = raw_node.ready();
if rd
.ss()
.map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)
{
break;
}

raw_node.advance(rd);
}

// should be accepted
let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(result.is_ok());

// shoule be dropped
let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(!result.is_ok());
assert_eq!(result.unwrap_err(), Error::ProposalDropped);

// should be accepted when previous data has been committed
let rd = raw_node.ready();
s.wl().append(rd.entries()).unwrap();
raw_node.advance(rd);

let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(result.is_ok());
}
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

pub use super::read_only::{ReadOnlyOption, ReadState};
use super::util::NO_LIMIT;
use super::{
errors::{Error, Result},
INVALID_ID,
Expand Down Expand Up @@ -90,6 +91,10 @@ pub struct Config {

/// The election priority of this node.
pub priority: u64,

/// Specify maximum of uncommited entry size.
/// When this limit is reached, all proposals to append new log will be dropped
pub max_uncommitted_size: u64,
}

impl Default for Config {
Expand All @@ -110,6 +115,7 @@ impl Default for Config {
skip_bcast_commit: false,
batch_append: false,
priority: 0,
max_uncommitted_size: NO_LIMIT,
}
}
}
Expand Down Expand Up @@ -189,6 +195,12 @@ impl Config {
));
}

if self.max_uncommitted_size < self.max_size_per_msg {
return Err(Error::ConfigInvalid(
"max uncommitted size should greater than max_size_per_msg".to_owned(),
));
}

Ok(())
}
}
4 changes: 4 additions & 0 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Unstable {
}

/// Append entries to unstable, truncate local block first if overlapped.
///
/// # Panics
///
/// Panics if truncate logs to the entry before snapshot
pub fn truncate_and_append(&mut self, ents: &[Entry]) {
let after = ents[0].index;
if after == self.offset + self.entries.len() as u64 {
Expand Down
Loading

0 comments on commit a424fdb

Please sign in to comment.