Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

AuthorityRound: finalize blocks #9113

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl<T: ChainDataFetcher> Client<T> {
The node may not be able to synchronize further.", e);
}

let epoch_proof = self.engine.is_epoch_end(
let epoch_proof = self.engine.is_epoch_end_light(
&verified_header,
&|h| self.chain.block_header(BlockId::Hash(h)).and_then(|hdr| hdr.decode().ok()),
&|h| self.chain.pending_transition(h),
Expand Down
23 changes: 16 additions & 7 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl Importer {
let number = header.number();
let parent = header.parent_hash();
let chain = client.chain.read();
let is_finalized = false;
let mut is_finalized = false;

// Commit results
let block = block.drain();
Expand Down Expand Up @@ -535,10 +535,18 @@ impl Importer {

state.journal_under(&mut batch, number, hash).expect("DB commit failed");

for ancestry_action in ancestry_actions {
let AncestryAction::MarkFinalized(ancestry) = ancestry_action;
chain.mark_finalized(&mut batch, ancestry).expect("Engine's ancestry action must be known blocks; qed");
}
let finalized: Vec<_> = ancestry_actions.into_iter().map(|ancestry_action| {
let AncestryAction::MarkFinalized(a) = ancestry_action;

if a != header.hash() {
chain.mark_finalized(&mut batch, a).expect("Engine's ancestry action must be known blocks; qed");
} else {
// we're finalizing the current block
is_finalized = true;
}

a
}).collect();

let route = chain.insert_block(&mut batch, block_data, receipts.clone(), ExtrasInsert {
fork_choice: fork_choice,
Expand All @@ -559,7 +567,7 @@ impl Importer {
client.db.read().key_value().write_buffered(batch);
chain.commit();

self.check_epoch_end(&header, &chain, client);
self.check_epoch_end(&header, &finalized, &chain, client);

client.update_last_hashes(&parent, hash);

Expand Down Expand Up @@ -666,9 +674,10 @@ impl Importer {
}

// check for ending of epoch and write transition if it occurs.
fn check_epoch_end<'a>(&self, header: &'a Header, chain: &BlockChain, client: &Client) {
fn check_epoch_end<'a>(&self, header: &'a Header, finalized: &'a [H256], chain: &BlockChain, client: &Client) {
let is_epoch_end = self.engine.is_epoch_end(
header,
finalized,
&(|hash| client.block_header_decoded(BlockId::Hash(hash))),
&(|hash| chain.get_pending_transition(hash)), // TODO: limit to current epoch.
);
Expand Down
17 changes: 5 additions & 12 deletions ethcore/src/engines/authority_round/finality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ impl RollingFinality {
}

/// Get an iterator over stored hashes in order.
pub fn unfinalized_hashes(&self) -> Iter { Iter(self.headers.iter()) }
#[cfg(test)]
pub fn unfinalized_hashes(&self) -> impl Iterator<Item=&H256> {
self.headers.iter().map(|(h, _)| h)
}

/// Get the validator set.
pub fn validators(&self) -> &SimpleList { &self.signers }
Expand Down Expand Up @@ -145,16 +148,6 @@ impl RollingFinality {
}
}

pub struct Iter<'a>(::std::collections::vec_deque::Iter<'a, (H256, Vec<Address>)>);

impl<'a> Iterator for Iter<'a> {
type Item = H256;

fn next(&mut self) -> Option<H256> {
self.0.next().map(|&(h, _)| h)
}
}

#[cfg(test)]
mod tests {
use ethereum_types::{H256, Address};
Expand Down Expand Up @@ -220,7 +213,7 @@ mod tests {

// only the last hash has < 51% of authorities' signatures
assert_eq!(finality.unfinalized_hashes().count(), 1);
assert_eq!(finality.unfinalized_hashes().next(), Some(hashes[11].0));
assert_eq!(finality.unfinalized_hashes().next(), Some(&hashes[11].0));
assert_eq!(finality.subchain_head(), Some(hashes[11].0));
}
}
247 changes: 152 additions & 95 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use itertools::{self, Itertools};
use rlp::{encode, Decodable, DecoderError, Encodable, RlpStream, Rlp};
use ethereum_types::{H256, H520, Address, U128, U256};
use parking_lot::{Mutex, RwLock};
use types::ancestry_action::AncestryAction;
use unexpected::{Mismatch, OutOfBounds};

mod finality;
Expand Down Expand Up @@ -764,6 +765,67 @@ impl AuthorityRound {
}
}
}

// Returns the hashes of all ancestor blocks that are finalized by the given `chain_head`.
fn build_finality(&self, chain_head: &Header, ancestry: &mut Iterator<Item=Header>) -> Vec<H256> {
if self.immediate_transitions { return Vec::new() }

let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
warn!(target: "engine", "Unable to apply ancestry actions: missing client ref.");
return Vec::new();
}
};

let mut epoch_manager = self.epoch_manager.lock();
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, chain_head) {
return Vec::new();
}

if epoch_manager.finality_checker.subchain_head() != Some(*chain_head.parent_hash()) {
// build new finality checker from unfinalized ancestry of chain head, not including chain head itself yet.
trace!(target: "finality", "Building finality up to parent of {} ({})",
chain_head.hash(), chain_head.parent_hash());

// the empty steps messages in a header signal approval of the
// parent header.
let mut parent_empty_steps_signers = match header_empty_steps_signers(&chain_head, self.empty_steps_transition) {
Ok(empty_step_signers) => empty_step_signers,
Err(_) => {
warn!(target: "finality", "Failed to get empty step signatures from block {}", chain_head.hash());
return Vec::new();
}
};

let ancestry_iter = ancestry.map(|header| {
let mut signers = vec![header.author().clone()];
signers.extend(parent_empty_steps_signers.drain(..));

if let Ok(empty_step_signers) = header_empty_steps_signers(&header, self.empty_steps_transition) {
let res = (header.hash(), signers);
trace!(target: "finality", "Ancestry iteration: yielding {:?}", res);

parent_empty_steps_signers = empty_step_signers;

Some(res)

} else {
warn!(target: "finality", "Failed to get empty step signatures from block {}", header.hash());
None
}
})
.while_some();

if let Err(_) = epoch_manager.finality_checker.build_ancestry_subchain(ancestry_iter) {
debug!(target: "engine", "inconsistent validator set within epoch");
return Vec::new();
}
}

let finalized = epoch_manager.finality_checker.push_hash(chain_head.hash(), vec![chain_head.author().clone()]);
finalized.unwrap_or_default()
}
}

fn unix_now() -> Duration {
Expand Down Expand Up @@ -1226,7 +1288,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
self.validators.signals_epoch_end(first, header, aux)
}

fn is_epoch_end(
fn is_epoch_end_light(
&self,
chain_head: &Header,
chain: &super::Headers<Header>,
Expand All @@ -1235,116 +1297,98 @@ impl Engine<EthereumMachine> for AuthorityRound {
// epochs only matter if we want to support light clients.
if self.immediate_transitions { return None }

let first = chain_head.number() == 0;

// apply immediate transitions.
if let Some(change) = self.validators.is_epoch_end(first, chain_head) {
let change = combine_proofs(chain_head.number(), &change, &[]);
return Some(change)
}
let epoch_transition_hash = {
let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
warn!(target: "engine", "Unable to check for epoch end: missing client ref.");
return None;
}
};

let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
warn!(target: "engine", "Unable to check for epoch end: missing client ref.");
let mut epoch_manager = self.epoch_manager.lock();
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, chain_head) {
return None;
}

epoch_manager.epoch_transition_hash
};

// find most recently finalized blocks, then check transition store for pending transitions.
let mut epoch_manager = self.epoch_manager.lock();
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, chain_head) {
return None;
}
let mut hash = chain_head.parent_hash().clone();

if epoch_manager.finality_checker.subchain_head() != Some(*chain_head.parent_hash()) {
// build new finality checker from ancestry of chain head,
// not including chain head itself yet.
trace!(target: "finality", "Building finality up to parent of {} ({})",
chain_head.hash(), chain_head.parent_hash());
let mut ancestry = itertools::repeat_call(move || {
chain(hash).and_then(|header| {
if header.number() == 0 { return None }
hash = header.parent_hash().clone();
Some(header)
})
})
.while_some()
.take_while(|header| header.hash() != epoch_transition_hash);

let mut hash = chain_head.parent_hash().clone();
let mut parent_empty_steps_signers = match header_empty_steps_signers(&chain_head, self.empty_steps_transition) {
Ok(empty_step_signers) => empty_step_signers,
Err(_) => {
warn!(target: "finality", "Failed to get empty step signatures from block {}", chain_head.hash());
return None;
}
};
let finalized = self.build_finality(chain_head, &mut ancestry);

self.is_epoch_end(chain_head, &finalized, chain, transition_store)
}

fn is_epoch_end(
&self,
chain_head: &Header,
finalized: &[H256],
chain: &super::Headers<Header>,
transition_store: &super::PendingTransitionStore,
) -> Option<Vec<u8>> {
// epochs only matter if we want to support light clients.
if self.immediate_transitions { return None }

let epoch_transition_hash = epoch_manager.epoch_transition_hash;
let first = chain_head.number() == 0;

// walk the chain within current epoch backwards.
// author == ec_recover(sig) known since the blocks are in the DB.
// the empty steps messages in a header signal approval of the parent header.
let ancestry_iter = itertools::repeat_call(move || {
chain(hash).and_then(|header| {
if header.number() == 0 { return None }
// apply immediate transitions.
if let Some(change) = self.validators.is_epoch_end(first, chain_head) {
let change = combine_proofs(chain_head.number(), &change, &[]);
return Some(change)
}

let mut signers = vec![header.author().clone()];
signers.extend(parent_empty_steps_signers.drain(..));
// check transition store for pending transitions against recently finalized blocks
for finalized_hash in finalized {
if let Some(pending) = transition_store(*finalized_hash) {
// walk the chain backwards from current head until finalized_hash
// to construct transition proof. author == ec_recover(sig) known
// since the blocks are in the DB.
let mut hash = chain_head.hash();
let mut finality_proof: Vec<_> = itertools::repeat_call(move || {
chain(hash).and_then(|header| {
hash = *header.parent_hash();
if header.number() == 0 { return None }
else { return Some(header) }
})
})
.while_some()
.take_while(|h| h.hash() != *finalized_hash)
.collect();

if let Ok(empty_step_signers) = header_empty_steps_signers(&header, self.empty_steps_transition) {
let res = (hash, signers);
trace!(target: "finality", "Ancestry iteration: yielding {:?}", res);
let finalized_header = chain(*finalized_hash)
.expect("header is finalized; finalized headers must exist in the chain; qed");

hash = header.parent_hash().clone();
parent_empty_steps_signers = empty_step_signers;
let signal_number = finalized_header.number();
info!(target: "engine", "Applying validator set change signalled at block {}", signal_number);

Some(res)
finality_proof.push(finalized_header);
finality_proof.reverse();

} else {
warn!(target: "finality", "Failed to get empty step signatures from block {}", header.hash());
None
}
})
})
.while_some()
.take_while(|&(h, _)| h != epoch_transition_hash);
let finality_proof = ::rlp::encode_list(&finality_proof);

if let Err(_) = epoch_manager.finality_checker.build_ancestry_subchain(ancestry_iter) {
debug!(target: "engine", "inconsistent validator set within epoch");
return None;
}
}
self.epoch_manager.lock().note_new_epoch();

{
if let Ok(finalized) = epoch_manager.finality_checker.push_hash(chain_head.hash(), vec![chain_head.author().clone()]) {
let mut finalized = finalized.into_iter();
while let Some(finalized_hash) = finalized.next() {
if let Some(pending) = transition_store(finalized_hash) {
let finality_proof = ::std::iter::once(finalized_hash)
.chain(finalized)
.chain(epoch_manager.finality_checker.unfinalized_hashes())
.map(|h| if h == chain_head.hash() {
// chain closure only stores ancestry, but the chain head is also
// unfinalized.
chain_head.clone()
} else {
chain(h).expect("these headers fetched before when constructing finality checker; qed")
})
.collect::<Vec<Header>>();

// this gives us the block number for `hash`, assuming it's ancestry.
let signal_number = chain_head.number()
- finality_proof.len() as BlockNumber
+ 1;
let finality_proof = ::rlp::encode_list(&finality_proof);
epoch_manager.note_new_epoch();

info!(target: "engine", "Applying validator set change signalled at block {}", signal_number);

// We turn off can_propose here because upon validator set change there can
// be two valid proposers for a single step: one from the old set and
// one from the new.
//
// This way, upon encountering an epoch change, the proposer from the
// new set will be forced to wait until the next step to avoid sealing a
// block that breaks the invariant that the parent's step < the block's step.
self.step.can_propose.store(false, AtomicOrdering::SeqCst);
return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof));
}
}
// We turn off can_propose here because upon validator set change there can
// be two valid proposers for a single step: one from the old set and
// one from the new.
//
// This way, upon encountering an epoch change, the proposer from the
// new set will be forced to wait until the next step to avoid sealing a
// block that breaks the invariant that the parent's step < the block's step.
self.step.can_propose.store(false, AtomicOrdering::SeqCst);
return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof));
}
}

Expand Down Expand Up @@ -1399,6 +1443,19 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn fork_choice(&self, new: &ExtendedHeader, current: &ExtendedHeader) -> super::ForkChoice {
super::total_difficulty_fork_choice(new, current)
}

fn ancestry_actions(&self, block: &ExecutedBlock, ancestry: &mut Iterator<Item=ExtendedHeader>) -> Vec<AncestryAction> {
let finalized = self.build_finality(
block.header(),
&mut ancestry.take_while(|e| !e.is_finalized).map(|e| e.header),
);

if !finalized.is_empty() {
debug!(target: "finality", "Finalizing blocks: {:?}", finalized);
}

finalized.into_iter().map(AncestryAction::MarkFinalized).collect()
}
}

#[cfg(test)]
Expand Down
Loading