Skip to content

Commit

Permalink
Check pending conf change before campaign (#225)
Browse files Browse the repository at this point in the history
Fix #221.
  • Loading branch information
BusyJay authored Apr 29, 2019
1 parent aa131db commit 64571f0
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 45 deletions.
3 changes: 2 additions & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ fn send_propose(sender: mpsc::Sender<Msg>) {
cb: Box::new(move || {
s1.send(0).unwrap();
}),
}).unwrap();
})
.unwrap();

let n = r1.recv().unwrap();
assert_eq!(n, 0);
Expand Down
86 changes: 52 additions & 34 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,7 @@ impl<T: Storage> Raft<T> {
self.bcast_heartbeat_with_ctx(ctx)
}

#[cfg_attr(
feature = "cargo-clippy",
allow(clippy::needless_pass_by_value)
)]
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
fn bcast_heartbeat_with_ctx(&mut self, ctx: Option<Vec<u8>>) {
let self_id = self.id;
let mut prs = self.take_prs();
Expand Down Expand Up @@ -1002,35 +999,7 @@ impl<T: Storage> Raft<T> {
fail_point!("before_step");

match m.get_msg_type() {
MessageType::MsgHup => if self.state != StateRole::Leader {
let ents = self
.raft_log
.slice(
self.raft_log.applied + 1,
self.raft_log.committed + 1,
raft_log::NO_LIMIT,
).expect("unexpected error getting unapplied entries");
let n = self.num_pending_conf(&ents);
if n != 0 && self.raft_log.committed > self.raft_log.applied {
warn!(
"{} cannot campaign at term {} since there are still {} pending \
configuration changes to apply",
self.tag, self.term, n
);
return Ok(());
}
info!(
"{} is starting a new election at term {}",
self.tag, self.term
);
if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
} else {
debug!("{} ignoring MsgHup because already leader", self.tag);
},
MessageType::MsgHup => self.hup(false),
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
// We can vote if this is a repeat of a vote we've already cast...
let can_vote = (self.vote == m.get_from()) ||
Expand Down Expand Up @@ -1079,6 +1048,55 @@ impl<T: Storage> Raft<T> {
Ok(())
}

fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
debug!("{} ignoring MsgHup because already leader", self.tag);
return;
}

// If there is a pending snapshot, its index will be returned by
// `maybe_first_index`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
let first_index = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

let ents = self
.raft_log
.slice(first_index, self.raft_log.committed + 1, raft_log::NO_LIMIT)
.unwrap_or_else(|e| {
panic!(
"{} unexpected error getting unapplied entries [{}, {}): {:?}",
self.tag,
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
warn!(
"{} cannot campaign at term {} since there are still {} pending \
configuration changes to apply",
self.tag, self.term, n
);
return;
}
info!(
"{} is starting a new election at term {}",
self.tag, self.term
);
if transfer_leader {
self.campaign(CAMPAIGN_TRANSFER);
} else if self.pre_vote {
self.campaign(CAMPAIGN_PRE_ELECTION);
} else {
self.campaign(CAMPAIGN_ELECTION);
}
}

fn log_vote_approve(&self, m: &Message) {
info!(
"{} [logterm: {}, index: {}, vote: {}] cast {:?} for {} [logterm: {}, index: {}] \
Expand Down Expand Up @@ -1632,7 +1650,7 @@ impl<T: Storage> Raft<T> {
// Leadership transfers never use pre-vote even if self.pre_vote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
self.campaign(CAMPAIGN_TRANSFER);
self.hup(true);
} else {
info!(
"{} received MsgTimeoutNow from {} but is not promotable",
Expand Down
8 changes: 3 additions & 5 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ impl Ready {
(match since_idx {
None => raft.raft_log.next_entries(),
Some(idx) => raft.raft_log.next_entries_since(idx),
}).unwrap_or_else(Vec::new),
})
.unwrap_or_else(Vec::new),
);
let ss = raft.soft_state();
if &ss != prev_ss {
Expand Down Expand Up @@ -286,10 +287,7 @@ impl<T: Storage> RawNode<T> {
}

/// ProposeConfChange proposes a config change.
#[cfg_attr(
feature = "cargo-clippy",
allow(clippy::needless_pass_by_value)
)]
#[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))]
pub fn propose_conf_change(&mut self, context: Vec<u8>, cc: ConfChange) -> Result<()> {
let data = protobuf::Message::write_to_bytes(&cc)?;
let mut m = Message::new();
Expand Down
3 changes: 2 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ pub fn limit_size<T: Message + Clone>(entries: &mut Vec<T>, max: u64) {
size += u64::from(Message::compute_size(e));
size <= max
}
}).count();
})
.count();

entries.truncate(limit);
}
62 changes: 62 additions & 0 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4049,3 +4049,65 @@ fn test_prevote_with_check_quorum() {
assert_eq!(network.peers[&2].state, StateRole::Leader, "peer 2 state",);
assert_eq!(network.peers[&3].state, StateRole::Follower, "peer 3 state",);
}

/// Tests if unapplied conf change is checked before campaign.
#[test]
fn test_conf_change_check_before_campaign() {
setup_for_test();
let mut nt = Network::new(vec![None, None, None]);
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);

let mut m = new_message(1, 1, MessageType::MsgPropose, 0);
let mut e = Entry::new();
e.set_entry_type(EntryType::EntryConfChange);
let mut cc = ConfChange::new();
cc.set_change_type(ConfChangeType::RemoveNode);
cc.set_node_id(3);
e.set_data(protobuf::Message::write_to_bytes(&cc).unwrap());
m.mut_entries().push(e);
nt.send(vec![m]);

// trigger campaign in node 2
nt.peers
.get_mut(&2)
.unwrap()
.reset_randomized_election_timeout();
let timeout = nt.peers[&2].get_randomized_election_timeout();
for _ in 0..timeout {
nt.peers.get_mut(&2).unwrap().tick();
}
// It's still follower because committed conf change is not applied.
assert_eq!(nt.peers[&2].state, StateRole::Follower);

// Transfer leadership to peer 2.
nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
// It's still follower because committed conf change is not applied.
assert_eq!(nt.peers[&2].state, StateRole::Follower);
// Abort transfer leader.
nt.peers.get_mut(&1).unwrap().abort_leader_transfer();

let committed = nt.peers[&2].raft_log.committed;
nt.peers.get_mut(&2).unwrap().raft_log.applied_to(committed);
nt.peers.get_mut(&2).unwrap().remove_node(3);

// transfer leadership to peer 2 again.
nt.send(vec![new_message(2, 1, MessageType::MsgTransferLeader, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&2].state, StateRole::Leader);

nt.peers.get_mut(&1).unwrap().raft_log.applied_to(committed);
nt.peers.get_mut(&1).unwrap().remove_node(3);

// trigger campaign in node 1
nt.peers
.get_mut(&1)
.unwrap()
.reset_randomized_election_timeout();
let timeout = nt.peers[&1].get_randomized_election_timeout();
for _ in 0..timeout {
nt.peers.get_mut(&1).unwrap().tick();
}
assert_eq!(nt.peers[&1].state, StateRole::Candidate);
}
6 changes: 4 additions & 2 deletions tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ fn new_raw_node(
&new_test_config(id, peers, election, heartbeat),
storage,
peer_nodes,
).unwrap()
)
.unwrap()
}

// test_raw_node_step ensures that RawNode.Step ignore local message.
Expand All @@ -101,7 +102,8 @@ fn test_raw_node_step() {
MessageType::MsgHup,
MessageType::MsgUnreachable,
MessageType::MsgSnapStatus,
].contains(msg_t)
]
.contains(msg_t)
{
assert_eq!(res, Err(Error::StepLocalMsg));
}
Expand Down
6 changes: 4 additions & 2 deletions tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,12 @@ impl Network {
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
}).cloned()
})
.cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
}).collect()
})
.collect()
}

pub fn send(&mut self, msgs: Vec<Message>) {
Expand Down

0 comments on commit 64571f0

Please sign in to comment.