Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix sync stalling on moving head (best block) prior to the tip of the chain sometimes #4091

Merged
merged 3 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,16 +1587,8 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
}
}

match self.block_status(&BlockId::Hash(parent_hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if allow_missing_state => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}

// Own status must be checked first. If the block and ancestry is pruned
// this function must return `AlreadyInChain` rather than `MissingState`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems incongruous to the code that follows it:

BlockStatus::InChainPruned => {}

so it doesn't return AlreadyInChain ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

match self.block_status(&BlockId::Hash(hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
Expand All @@ -1605,6 +1597,17 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}

match self.block_status(&BlockId::Hash(parent_hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if allow_missing_state => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}


Ok(ImportResult::imported(false))
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ pub enum BlockImportError {
VerificationFailed(Option<Origin>, String),
/// Block is known to be Bad
BadBlock(Option<Origin>),
/// Parent state is missing.
MissingState,
/// Block has an unknown parent
UnknownParent,
/// Block import has been cancelled. This can happen if the parent block fails to be imported.
Expand Down Expand Up @@ -207,7 +209,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())),
Ok(ImportResult::MissingState) => {
debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::UnknownParent)
Err(BlockImportError::MissingState)
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash);
Expand Down
16 changes: 8 additions & 8 deletions core/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,12 +1009,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), status.roles, status.best_number);
match self.sync.new_peer(who.clone(), info) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
}
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
Expand Down Expand Up @@ -1341,12 +1343,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
let peers = self.context_data.peers.clone();
let results = self.sync.on_blocks_processed(
imported,
count,
results,
|peer_id| peers.get(peer_id).map(|i| i.info.clone())
);
for result in results {
match result {
Expand Down
68 changes: 34 additions & 34 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
config::{Roles, BoxFinalityProofRequestBuilder},
message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
FinalityProofResponse},
protocol
};
use either::Either;
use extra_requests::ExtraRequests;
Expand Down Expand Up @@ -347,23 +346,22 @@ impl<B: BlockT> ChainSync<B> {
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
pub fn new_peer(&mut self, who: PeerId, info: protocol::PeerInfo<B>) -> Result<Option<BlockRequest<B>>, BadPeer> {
pub fn new_peer(&mut self, who: PeerId, best_hash: B::Hash, best_number: NumberFor<B>)
-> Result<Option<BlockRequest<B>>, BadPeer>
{
// There is nothing sync can get from the node that has no blockchain data.
if !info.roles.is_full() {
return Ok(None)
}
match self.block_status(&info.best_hash) {
match self.block_status(&best_hash) {
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
Err(BadPeer(who, BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE))
}
Ok(BlockStatus::KnownBad) => {
info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
info!("New peer with known bad best block {} ({}).", best_hash, best_number);
Err(BadPeer(who, i32::min_value()))
}
Ok(BlockStatus::Unknown) => {
if info.best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
if best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", best_hash, best_number);
return Err(BadPeer(who, i32::min_value()))
}
// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
Expand All @@ -378,8 +376,8 @@ impl<B: BlockT> ChainSync<B> {
);
self.peers.insert(who, PeerSync {
common_number: self.best_queued_number,
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default()
});
Expand All @@ -388,30 +386,30 @@ impl<B: BlockT> ChainSync<B> {

// If we are at genesis, just start downloading.
if self.best_queued_number.is_zero() {
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
return Ok(None)
}

let common_best = std::cmp::min(self.best_queued_number, info.best_number);
let common_best = std::cmp::min(self.best_queued_number, best_number);

debug!(target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
info.best_hash,
info.best_number
best_hash,
best_number
);

self.peers.insert(who, PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::AncestorSearch(
common_best,
AncestorSearchState::ExponentialBackoff(One::one())
Expand All @@ -423,11 +421,11 @@ impl<B: BlockT> ChainSync<B> {
Ok(Some(ancestry_request::<B>(common_best)))
}
Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
common_number: best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
Expand Down Expand Up @@ -849,7 +847,6 @@ impl<B: BlockT> ChainSync<B> {
imported: usize,
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
mut peer_info: impl FnMut(&PeerId) -> Option<protocol::PeerInfo<B>>
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
trace!(target: "sync", "Imported {} of {}", imported, count);

Expand Down Expand Up @@ -902,27 +899,33 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = who {
info!("Peer sent block with incomplete header to import");
output.push(Err(BadPeer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
info!("Verification failed from peer: {}", e);
output.push(Err(BadPeer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("Bad block");
output.push(Err(BadPeer(peer, BAD_BLOCK_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime becasue other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block");
},
Err(BlockImportError::UnknownParent) |
Err(BlockImportError::Cancelled) |
Err(BlockImportError::Other(_)) => {
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
},
};
}
Expand Down Expand Up @@ -1121,9 +1124,7 @@ impl<B: BlockT> ChainSync<B> {
}

/// Restart the sync process.
fn restart<'a, F>
(&'a mut self, mut peer_info: F) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
where F: FnMut(&PeerId) -> Option<protocol::PeerInfo<B>> + 'a
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
{
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
Expand All @@ -1134,9 +1135,8 @@ impl<B: BlockT> ChainSync<B> {
self.is_idle = false;
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::replace(&mut self.peers, HashMap::new());
old_peers.into_iter().filter_map(move |(id, _)| {
let info = peer_info(&id)?;
match self.new_peer(id.clone(), info) {
old_peers.into_iter().filter_map(move |(id, p)| {
match self.new_peer(id.clone(), p.best_hash, p.best_number) {
Ok(None) => None,
Ok(Some(x)) => Some(Ok((id, x))),
Err(e) => Some(Err(e))
Expand Down