Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fixed AuthorityRound deadlock on shutdown #8803

Merged
merged 1 commit into from
Jun 8, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,16 @@ impl Decodable for SealedEmptyStep {
}
}

struct PermissionedStep {
inner: Step,
can_propose: AtomicBool,
}

/// Engine using `AuthorityRound` proof-of-authority BFT consensus.
pub struct AuthorityRound {
transition_service: IoService<()>,
step: Arc<Step>,
can_propose: AtomicBool,
client: RwLock<Option<Weak<EngineClient>>>,
step: Arc<PermissionedStep>,
client: Arc<RwLock<Option<Weak<EngineClient>>>>,
signer: RwLock<EngineSigner>,
validators: Box<ValidatorSet>,
validate_score_transition: u64,
Expand All @@ -407,15 +411,15 @@ pub struct AuthorityRound {

// header-chain validator.
struct EpochVerifier {
step: Arc<Step>,
step: Arc<PermissionedStep>,
subchain_validators: SimpleList,
empty_steps_transition: u64,
}

impl super::EpochVerifier<EthereumMachine> for EpochVerifier {
fn verify_light(&self, header: &Header) -> Result<(), Error> {
// Validate the timestamp
verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?)?;
verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?)?;
// always check the seal since it's fast.
// nothing heavier to do.
verify_external(header, &self.subchain_validators, self.empty_steps_transition)
Expand Down Expand Up @@ -615,13 +619,15 @@ impl AuthorityRound {
let engine = Arc::new(
AuthorityRound {
transition_service: IoService::<()>::start()?,
step: Arc::new(Step {
inner: AtomicUsize::new(initial_step),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
step: Arc::new(PermissionedStep {
inner: Step {
inner: AtomicUsize::new(initial_step),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
},
can_propose: AtomicBool::new(true),
}),
can_propose: AtomicBool::new(true),
client: RwLock::new(None),
client: Arc::new(RwLock::new(None)),
signer: Default::default(),
validators: our_params.validators,
validate_score_transition: our_params.validate_score_transition,
Expand All @@ -641,7 +647,10 @@ impl AuthorityRound {

// Do not initialize timeouts for tests.
if should_timeout {
let handler = TransitionHandler { engine: Arc::downgrade(&engine) };
let handler = TransitionHandler {
step: engine.step.clone(),
client: engine.client.clone(),
};
engine.transition_service.register_handler(Arc::new(handler))?;
}
Ok(engine)
Expand All @@ -666,7 +675,7 @@ impl AuthorityRound {
}

fn generate_empty_step(&self, parent_hash: &H256) {
let step = self.step.load();
let step = self.step.inner.load();
let empty_step_rlp = empty_step_rlp(step, parent_hash);

if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) {
Expand Down Expand Up @@ -698,34 +707,37 @@ fn unix_now() -> Duration {
}

struct TransitionHandler {
engine: Weak<AuthorityRound>,
step: Arc<PermissionedStep>,
client: Arc<RwLock<Option<Weak<EngineClient>>>>,
}

const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;

impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
let remaining = engine.step.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
let remaining = self.step.inner.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}

fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
// NOTE we might be lagging by couple of steps in case the timeout
// has not been called fast enough.
// Make sure to advance up to the actual step.
while engine.step.duration_remaining().as_millis() == 0 {
engine.step();
// NOTE we might be lagging by couple of steps in case the timeout
// has not been called fast enough.
// Make sure to advance up to the actual step.
while self.step.inner.duration_remaining().as_millis() == 0 {
self.step.inner.increment();
self.step.can_propose.store(true, AtomicOrdering::SeqCst);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth extracting into a function:

fn step(step: &PermissionedStep, client: &RwLock<Option<Weak<EngineClient>>>) {
	step.inner.increment();
	step.can_propose.store(true, AtomicOrdering::SeqCst);
	if let Some(ref weak) = *client.read() {
		if let Some(c) = weak.upgrade() {
			c.update_sealing();
		}
	}
}

to avoid code duplication?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code would also leak, cause it upgrades the reference to EngineClient from worker thread

if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() {
c.update_sealing();
}
}

let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}

let next_run_at = self.step.inner.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
}
Expand All @@ -742,8 +754,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
}

fn step(&self) {
self.step.increment();
self.can_propose.store(true, AtomicOrdering::SeqCst);
self.step.inner.increment();
self.step.can_propose.store(true, AtomicOrdering::SeqCst);
if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() {
c.update_sealing();
Expand Down Expand Up @@ -790,7 +802,7 @@ impl Engine<EthereumMachine> for AuthorityRound {

fn populate_from_parent(&self, header: &mut Header, parent: &Header) {
let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed");
let current_step = self.step.load();
let current_step = self.step.inner.load();

let current_empty_steps_len = if header.number() >= self.empty_steps_transition {
self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len()
Expand All @@ -816,7 +828,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;;

if empty_step.verify(&*self.validators).unwrap_or(false) {
if self.step.check_future(empty_step.step).is_ok() {
if self.step.inner.check_future(empty_step.step).is_ok() {
trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step);
self.handle_empty_step_message(empty_step);
} else {
Expand All @@ -836,7 +848,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal {
// first check to avoid generating signature most of the time
// (but there's still a race to the `compare_and_swap`)
if !self.can_propose.load(AtomicOrdering::SeqCst) {
if !self.step.can_propose.load(AtomicOrdering::SeqCst) {
trace!(target: "engine", "Aborting seal generation. Can't propose.");
return Seal::None;
}
Expand All @@ -845,7 +857,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let parent_step: U256 = header_step(parent, self.empty_steps_transition)
.expect("Header has been verified; qed").into();

let step = self.step.load();
let step = self.step.inner.load();

// filter messages from old and future steps and different parents
let empty_steps = if header.number() >= self.empty_steps_transition {
Expand Down Expand Up @@ -922,7 +934,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step);

// only issue the seal if we were the first to reach the compare_and_swap.
if self.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {

self.clear_empty_steps(parent_step);

Expand Down Expand Up @@ -999,7 +1011,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
.decode()?;

let parent_step = header_step(&parent, self.empty_steps_transition)?;
let current_step = self.step.load();
let current_step = self.step.inner.load();
self.empty_steps(parent_step.into(), current_step.into(), parent.hash())
} else {
// we're verifying a block, extract empty steps from the seal
Expand Down Expand Up @@ -1052,7 +1064,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// If yes then probably benign reporting needs to be moved further in the verification.
let set_number = header.number();

match verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?) {
match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) {
Err(BlockError::InvalidSeal) => {
self.validators.report_benign(header.author(), set_number, header.number());
Err(BlockError::InvalidSeal.into())
Expand Down Expand Up @@ -1294,7 +1306,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// This way, upon encountering an epoch change, the proposer from the
// new set will be forced to wait until the next step to avoid sealing a
// block that breaks the invariant that the parent's step < the block's step.
self.can_propose.store(false, AtomicOrdering::SeqCst);
self.step.can_propose.store(false, AtomicOrdering::SeqCst);
return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof));
}
}
Expand Down